759310d13a
The volume watcher design was based on deploymentwatcher and drainer, but has an important difference: we don't want to maintain a goroutine for the lifetime of the volume. So we stop the volumewatcher goroutine for a volume when that volume has no more claims to free. But the shutdown races with updates on the parent goroutine, and it's possible to drop updates. Fortunately these updates are picked up on the next core GC job, but we're most likely to hit this race when we're replacing an allocation and that's the time we least want to wait. Wait until the volume has "settled" before stopping this goroutine so that the race between shutdown and the parent goroutine sending on `<-updateCh` is pushed to after the window we most care about quick freeing of claims. * Fixes a resource leak when volumewatchers are no longer needed. The volume is nil and can't ever be started again, so the volume's `watcher` should be removed from the top-level `Watcher`. * De-flakes the GC job test: the test throws an error because the claimed node doesn't exist and is unreachable. This flaked instead of failed because we didn't correctly wait for the first pass through the volumewatcher. Make the GC job wait for the volumewatcher to reach the quiescent timeout window state before running the GC eval under test, so that we're sure the GC job's work isn't being picked up by processing one of the earlier claims. Update the claims used so that we're sure the GC pass won't hit a node unpublish error. * Adds trace logging to unpublish operations
273 lines
7.4 KiB
Go
273 lines
7.4 KiB
Go
package volumewatcher
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// volumeWatcher is used to watch a single volume and trigger the
|
|
// scheduler when allocation health transitions.
|
|
type volumeWatcher struct {
|
|
// v is the volume being watched
|
|
v *structs.CSIVolume
|
|
|
|
// state is the state that is watched for state changes.
|
|
state *state.StateStore
|
|
|
|
// server interface for CSI client RPCs
|
|
rpc CSIVolumeRPC
|
|
|
|
// the ACL needed to send RPCs
|
|
leaderAcl string
|
|
|
|
logger log.Logger
|
|
shutdownCtx context.Context // parent context
|
|
ctx context.Context // own context
|
|
exitFn context.CancelFunc
|
|
deleteFn func()
|
|
|
|
// quiescentTimeout is the time we wait until the volume has "settled"
|
|
// before stopping the child watcher goroutines
|
|
quiescentTimeout time.Duration
|
|
|
|
// updateCh is triggered when there is an updated volume
|
|
updateCh chan *structs.CSIVolume
|
|
|
|
wLock sync.RWMutex
|
|
running bool
|
|
}
|
|
|
|
// newVolumeWatcher returns a volume watcher that is used to watch
|
|
// volumes
|
|
func newVolumeWatcher(parent *Watcher, vol *structs.CSIVolume) *volumeWatcher {
|
|
|
|
w := &volumeWatcher{
|
|
updateCh: make(chan *structs.CSIVolume, 1),
|
|
v: vol,
|
|
state: parent.state,
|
|
rpc: parent.rpc,
|
|
leaderAcl: parent.leaderAcl,
|
|
logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace),
|
|
shutdownCtx: parent.ctx,
|
|
deleteFn: func() { parent.remove(vol.ID + vol.Namespace) },
|
|
quiescentTimeout: parent.quiescentTimeout,
|
|
}
|
|
|
|
// Start the long lived watcher that scans for allocation updates
|
|
w.Start()
|
|
return w
|
|
}
|
|
|
|
// Notify signals an update to the tracked volume.
|
|
func (vw *volumeWatcher) Notify(v *structs.CSIVolume) {
|
|
if !vw.isRunning() {
|
|
vw.Start()
|
|
}
|
|
select {
|
|
case vw.updateCh <- v:
|
|
case <-vw.shutdownCtx.Done(): // prevent deadlock if we stopped
|
|
case <-vw.ctx.Done(): // prevent deadlock if we stopped
|
|
}
|
|
}
|
|
|
|
func (vw *volumeWatcher) Start() {
|
|
vw.logger.Trace("starting watcher")
|
|
vw.wLock.Lock()
|
|
defer vw.wLock.Unlock()
|
|
vw.running = true
|
|
ctx, exitFn := context.WithCancel(vw.shutdownCtx)
|
|
vw.ctx = ctx
|
|
vw.exitFn = exitFn
|
|
go vw.watch()
|
|
}
|
|
|
|
// Stop stops watching the volume. This should be called whenever a
|
|
// volume's claims are fully reaped or the watcher is no longer needed.
|
|
func (vw *volumeWatcher) Stop() {
|
|
vw.logger.Trace("no more claims")
|
|
vw.exitFn()
|
|
}
|
|
|
|
func (vw *volumeWatcher) isRunning() bool {
|
|
vw.wLock.RLock()
|
|
defer vw.wLock.RUnlock()
|
|
select {
|
|
case <-vw.shutdownCtx.Done():
|
|
return false
|
|
case <-vw.ctx.Done():
|
|
return false
|
|
default:
|
|
return vw.running
|
|
}
|
|
}
|
|
|
|
// watch is the long-running function that watches for changes to a volume.
|
|
// Each pass steps the volume's claims through the various states of reaping
|
|
// until the volume has no more claims eligible to be reaped.
|
|
func (vw *volumeWatcher) watch() {
|
|
// always denormalize the volume and call reap when we first start
|
|
// the watcher so that we ensure we don't drop events that
|
|
// happened during leadership transitions and didn't get completed
|
|
// by the prior leader
|
|
vol := vw.getVolume(vw.v)
|
|
vw.volumeReap(vol)
|
|
|
|
timer, stop := helper.NewSafeTimer(vw.quiescentTimeout)
|
|
defer stop()
|
|
|
|
for {
|
|
select {
|
|
// TODO(tgross): currently server->client RPC have no cancellation
|
|
// context, so we can't stop the long-runner RPCs gracefully
|
|
case <-vw.shutdownCtx.Done():
|
|
return
|
|
case <-vw.ctx.Done():
|
|
return
|
|
case vol := <-vw.updateCh:
|
|
vol = vw.getVolume(vol)
|
|
if vol == nil {
|
|
// We stop the goroutine whenever we have no more
|
|
// work, but only delete the watcher when the volume
|
|
// is gone to avoid racing the blocking query
|
|
vw.deleteFn()
|
|
vw.Stop()
|
|
return
|
|
}
|
|
vw.volumeReap(vol)
|
|
timer.Reset(vw.quiescentTimeout)
|
|
case <-timer.C:
|
|
// Wait until the volume has "settled" before stopping
|
|
// this goroutine so that the race between shutdown and
|
|
// the parent goroutine sending on <-updateCh is pushed to
|
|
// after the window we most care about quick freeing of
|
|
// claims (and the GC job will clean up anything we miss)
|
|
vol = vw.getVolume(vol)
|
|
if vol == nil {
|
|
vw.deleteFn()
|
|
}
|
|
vw.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// getVolume returns the tracked volume, fully populated with the current
|
|
// state
|
|
func (vw *volumeWatcher) getVolume(vol *structs.CSIVolume) *structs.CSIVolume {
|
|
vw.wLock.RLock()
|
|
defer vw.wLock.RUnlock()
|
|
|
|
var err error
|
|
ws := memdb.NewWatchSet()
|
|
|
|
vol, err = vw.state.CSIVolumeByID(ws, vol.Namespace, vol.ID)
|
|
if err != nil {
|
|
vw.logger.Error("could not query for volume", "error", err)
|
|
return nil
|
|
}
|
|
if vol == nil {
|
|
return nil
|
|
}
|
|
|
|
vol, err = vw.state.CSIVolumeDenormalize(ws, vol)
|
|
if err != nil {
|
|
vw.logger.Error("could not query allocs for volume", "error", err)
|
|
return nil
|
|
}
|
|
vw.v = vol
|
|
return vol
|
|
}
|
|
|
|
// volumeReap collects errors for logging but doesn't return them
|
|
// to the main loop.
|
|
func (vw *volumeWatcher) volumeReap(vol *structs.CSIVolume) {
|
|
vw.logger.Trace("releasing unused volume claims")
|
|
err := vw.volumeReapImpl(vol)
|
|
if err != nil {
|
|
vw.logger.Error("error releasing volume claims", "error", err)
|
|
}
|
|
}
|
|
|
|
func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool {
|
|
return len(vol.ReadClaims) == 0 && len(vol.WriteClaims) == 0 && len(vol.PastClaims) == 0
|
|
}
|
|
|
|
// volumeReapImpl unpublished all the volume's PastClaims. PastClaims
|
|
// will be populated from nil or terminal allocs when we call
|
|
// CSIVolumeDenormalize(), so this assumes we've done so in the caller
|
|
func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error {
|
|
var result *multierror.Error
|
|
for _, claim := range vol.PastClaims {
|
|
err := vw.unpublish(vol, claim)
|
|
if err != nil {
|
|
result = multierror.Append(result, err)
|
|
}
|
|
}
|
|
return result.ErrorOrNil()
|
|
}
|
|
|
|
func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIVolume {
|
|
|
|
collect := func(allocs map[string]*structs.Allocation,
|
|
claims map[string]*structs.CSIVolumeClaim) {
|
|
|
|
for allocID, alloc := range allocs {
|
|
if alloc == nil {
|
|
_, exists := vol.PastClaims[allocID]
|
|
if !exists {
|
|
vol.PastClaims[allocID] = &structs.CSIVolumeClaim{
|
|
AllocationID: allocID,
|
|
State: structs.CSIVolumeClaimStateReadyToFree,
|
|
}
|
|
}
|
|
} else if alloc.Terminated() {
|
|
// don't overwrite the PastClaim if we've seen it before,
|
|
// so that we can track state between subsequent calls
|
|
_, exists := vol.PastClaims[allocID]
|
|
if !exists {
|
|
claim, ok := claims[allocID]
|
|
if !ok {
|
|
claim = &structs.CSIVolumeClaim{
|
|
AllocationID: allocID,
|
|
NodeID: alloc.NodeID,
|
|
}
|
|
}
|
|
claim.State = structs.CSIVolumeClaimStateTaken
|
|
vol.PastClaims[allocID] = claim
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
collect(vol.ReadAllocs, vol.ReadClaims)
|
|
collect(vol.WriteAllocs, vol.WriteClaims)
|
|
return vol
|
|
}
|
|
|
|
func (vw *volumeWatcher) unpublish(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
|
|
vw.logger.Trace("unpublishing volume", "alloc", claim.AllocationID)
|
|
req := &structs.CSIVolumeUnpublishRequest{
|
|
VolumeID: vol.ID,
|
|
Claim: claim,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: vol.Namespace,
|
|
Region: vw.state.Config().Region,
|
|
AuthToken: vw.leaderAcl,
|
|
},
|
|
}
|
|
err := vw.rpc.Unpublish(req, &structs.CSIVolumeUnpublishResponse{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
claim.State = structs.CSIVolumeClaimStateReadyToFree
|
|
return nil
|
|
}
|