72430a4e62
Following the new volumewatcher in #7794 and performance improvements to it that landed afterwards, there's no particular reason we should be threading claim releases through the GC eval rather than writing an empty `CSIVolumeClaimRequest` with the mode set to `CSIVolumeClaimRelease`, just as the GC evaluation would do. Also, by batching up these raft messages, we can reduce the amount of raft writes by 1 and cross-server RPCs by 1 per volume we release claims on.
77 lines
2 KiB
Go
77 lines
2 KiB
Go
package nomad
|
|
|
|
import (
|
|
log "github.com/hashicorp/go-hclog"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// csiBatchRelease is a helper for any time we need to release a bunch
|
|
// of volume claims at once. It de-duplicates the volumes and batches
|
|
// the raft messages into manageable chunks. Intended for use by RPCs
|
|
// that have already been forwarded to the leader.
|
|
type csiBatchRelease struct {
|
|
srv *Server
|
|
logger log.Logger
|
|
|
|
maxBatchSize int
|
|
seen map[string]struct{}
|
|
batches []*structs.CSIVolumeClaimBatchRequest
|
|
}
|
|
|
|
func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease {
|
|
return &csiBatchRelease{
|
|
srv: srv,
|
|
logger: logger,
|
|
maxBatchSize: max,
|
|
seen: map[string]struct{}{},
|
|
batches: []*structs.CSIVolumeClaimBatchRequest{{}},
|
|
}
|
|
}
|
|
|
|
// add the volume ID + namespace to the deduplicated batches
|
|
func (c *csiBatchRelease) add(vol, namespace string) {
|
|
id := vol + "\x00" + namespace
|
|
|
|
// ignore duplicates
|
|
_, seen := c.seen[id]
|
|
if seen {
|
|
return
|
|
}
|
|
c.seen[id] = struct{}{}
|
|
|
|
req := structs.CSIVolumeClaimRequest{
|
|
VolumeID: vol,
|
|
Claim: structs.CSIVolumeClaimRelease,
|
|
}
|
|
req.Namespace = namespace
|
|
req.Region = c.srv.config.Region
|
|
|
|
for _, batch := range c.batches {
|
|
// otherwise append to the first non-full batch
|
|
if len(batch.Claims) < c.maxBatchSize {
|
|
batch.Claims = append(batch.Claims, req)
|
|
return
|
|
}
|
|
}
|
|
// no non-full batch found, make a new one
|
|
newBatch := &structs.CSIVolumeClaimBatchRequest{
|
|
Claims: []structs.CSIVolumeClaimRequest{req}}
|
|
c.batches = append(c.batches, newBatch)
|
|
}
|
|
|
|
// apply flushes the batches to raft
|
|
func (c *csiBatchRelease) apply() error {
|
|
var result *multierror.Error
|
|
for _, batch := range c.batches {
|
|
if len(batch.Claims) > 0 {
|
|
_, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
|
|
if err != nil {
|
|
c.logger.Error("csi raft apply failed", "error", err, "method", "claim")
|
|
result = multierror.Append(result, err)
|
|
}
|
|
}
|
|
}
|
|
return result.ErrorOrNil()
|
|
}
|