csi: make volume GC in job deregister safely async

The `Job.Deregister` call will block on the client CSI controller RPCs
while the alloc still exists on the Nomad client node. So we need to
make the volume claim reaping async from the `Job.Deregister`. This
allows `nomad job stop` to return immediately. In order to make this
work, this changeset changes the volume GC so that the GC jobs are on a
by-volume basis rather than a by-job basis; we won't have to query
the (possibly deleted) job at the time of volume GC. We smuggle the
volume ID and whether it's a purge into the GC eval ID the same way we
smuggled the job ID previously.
This commit is contained in:
Tim Gross 2020-04-05 10:47:40 -04:00
parent 5a3b45864d
commit 027277a0d9
5 changed files with 133 additions and 126 deletions

View File

@ -143,12 +143,12 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
csiReq := req.ToCSIRequest()
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFn()
// CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithPerRetryTimeout(10*time.Second),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {

View File

@ -157,11 +157,6 @@ OUTER:
c.logger.Debug("job GC found eligible objects",
"jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc))
// Clean up any outstanding volume claims
if err := c.volumeClaimReap(gcJob, eval.LeaderACL); err != nil {
return err
}
// Reap the evals and allocs
if err := c.evalReap(gcEval, gcAlloc); err != nil {
return err
@ -720,90 +715,64 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")
// JobID smuggled in with the eval's own JobID
var jobID string
evalJobID := strings.Split(eval.JobID, ":")
if len(evalJobID) != 2 {
c.logger.Error("volume gc called without jobID")
// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")
if len(evalVolID) != 3 {
c.logger.Error("volume gc called without volID")
return nil
}
jobID = evalJobID[1]
job, err := c.srv.State().JobByID(nil, eval.Namespace, jobID)
if err != nil || job == nil {
c.logger.Trace(
"cannot find job to perform volume claim GC. it may have been garbage collected",
"job", jobID)
return nil
}
return c.volumeClaimReap([]*structs.Job{job}, eval.LeaderACL)
volID := evalVolID[1]
runningAllocs := evalVolID[2] == "purge"
return volumeClaimReap(c.srv, volID, eval.Namespace,
c.srv.config.Region, eval.LeaderACL, runningAllocs)
}
// volumeClaimReap contacts the leader and releases volume claims from terminal allocs
func (c *CoreScheduler) volumeClaimReap(jobs []*structs.Job, leaderACL string) error {
return volumeClaimReap(c.srv, c.logger, jobs, leaderACL, false)
}
func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error {
// volumeClaimReap contacts the leader and releases volume claims from terminal allocs
func volumeClaimReap(srv *Server, logger log.Logger, jobs []*structs.Job, leaderACL string, runningAllocs bool) error {
ws := memdb.NewWatchSet()
vol, err := srv.State().CSIVolumeByID(ws, namespace, volID)
if err != nil {
return err
}
if vol == nil {
return nil
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}
plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
return err
}
gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)
var result *multierror.Error
for _, job := range jobs {
logger.Trace("garbage collecting unclaimed CSI volume claims for job", "job", job.ID)
for _, taskGroup := range job.TaskGroups {
for _, tgVolume := range taskGroup.Volumes {
if tgVolume.Type != structs.VolumeTypeCSI {
continue // filter to just CSI volumes
}
volID := tgVolume.Source
vol, err := srv.State().CSIVolumeByID(ws, job.Namespace, volID)
if err != nil {
result = multierror.Append(result, err)
continue
}
if vol == nil {
logger.Trace("cannot find volume to be GC'd. it may have been deregistered",
"volume", volID)
continue
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
result = multierror.Append(result, err)
continue
}
plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
result = multierror.Append(result, err)
continue
}
gcClaims, nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)
for _, claim := range gcClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
region: job.Region,
namespace: job.Namespace,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
}
}
for _, claim := range gcClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
allocID: claim.allocID,
nodeID: claim.nodeID,
mode: claim.mode,
namespace: namespace,
region: region,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
}
return result.ErrorOrNil()
}
type gcClaimRequest struct {

View File

@ -707,9 +707,14 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
// For a job with volumes, run volume claim GC before deleting the job
if job != nil {
volumeClaimReap(j.srv, j.logger, []*structs.Job{job}, j.srv.getLeaderAcl(), true)
// For a job with volumes, find its volumes before deleting the job
volumesToGC := make(map[string]*structs.VolumeRequest)
for _, tg := range job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source] = vol
}
}
}
// Commit this update via Raft
@ -722,43 +727,75 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
evals := []*structs.Evaluation{}
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, vol := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
runningAllocs := ":ok"
if args.Purge {
runningAllocs = ":purge"
}
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source + runningAllocs,
LeaderACL: j.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
// If the job is periodic or parameterized, we don't create an eval
// for the job, but might still need one for the volumes
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
}
if len(evals) > 0 {
update := &structs.EvalUpdateRequest{
Evals: evals,
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}
// Populate the reply with eval information
reply.EvalID = evals[len(evals)-1].ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return nil
}

View File

@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation
// A set of de-duplicated IDs for jobs that need volume claim GC.
// Later we'll create a gc eval for each job.
jobsWithVolumeGCs := make(map[string]*structs.Job)
// A set of de-duplicated volumes that need volume claim GC.
// Later we'll create a gc eval for each volume.
volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace]
for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
@ -1097,9 +1097,10 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}
// if the job has been purged, this will always return error
job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
n.logger.Error("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err)
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err)
continue
}
if job == nil {
@ -1116,7 +1117,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
// of jobs we're going to call volume claim GC on.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
jobsWithVolumeGCs[job.ID] = job
volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace}
}
}
@ -1138,17 +1139,17 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
// Add an evaluation for garbage collecting the the CSI volume claims
// of jobs with terminal allocs
for _, job := range jobsWithVolumeGCs {
// of terminal allocs
for _, volAndNamespace := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the alloc's namespace
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Namespace: volAndNamespace[1],
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + job.ID,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0] + ":no",
LeaderACL: n.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),

View File

@ -2406,7 +2406,7 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
// Verify the eval for the claim GC was emitted
// Lookup the evaluations
eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+job.ID)
eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0+":no")
require.NotNil(t, eval)
require.Nil(t, err)
}