a7a64443e1
This changeset adds a subsystem to run on the leader, similar to the deployment watcher or node drainer. The `Watcher` performs a blocking query on updates to the `CSIVolumes` table and triggers reaping of volume claims. This will avoid tying up scheduling workers by immediately sending volume claim workloads into their own loop, rather than blocking the scheduling workers in the core GC job doing things like talking to CSI controllers The volume watcher is enabled on leader step-up and disabled on leader step-down. The volume claim GC mechanism now makes an empty claim RPC for the volume to trigger an index bump. That in turn unblocks the blocking query in the volume watcher so it can assess which claims can be released for a volume.
233 lines
6.3 KiB
Go
233 lines
6.3 KiB
Go
package volumewatcher
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
const (
|
|
// LimitStateQueriesPerSecond is the number of state queries allowed per
|
|
// second
|
|
LimitStateQueriesPerSecond = 100.0
|
|
|
|
// CrossVolumeUpdateBatchDuration is the duration in which volume
|
|
// claim updates are batched across all volume watchers before
|
|
// being committed to Raft.
|
|
CrossVolumeUpdateBatchDuration = 250 * time.Millisecond
|
|
)
|
|
|
|
// Watcher is used to watch volumes and their allocations created
|
|
// by the scheduler and trigger the scheduler when allocation health
|
|
// transitions.
|
|
type Watcher struct {
|
|
enabled bool
|
|
logger log.Logger
|
|
|
|
// queryLimiter is used to limit the rate of blocking queries
|
|
queryLimiter *rate.Limiter
|
|
|
|
// updateBatchDuration is the duration in which volume
|
|
// claim updates are batched across all volume watchers
|
|
// before being committed to Raft.
|
|
updateBatchDuration time.Duration
|
|
|
|
// raft contains the set of Raft endpoints that can be used by the
|
|
// volumes watcher
|
|
raft VolumeRaftEndpoints
|
|
|
|
// rpc contains the set of Server methods that can be used by
|
|
// the volumes watcher for RPC
|
|
rpc ClientRPC
|
|
|
|
// state is the state that is watched for state changes.
|
|
state *state.StateStore
|
|
|
|
// watchers is the set of active watchers, one per volume
|
|
watchers map[string]*volumeWatcher
|
|
|
|
// volumeUpdateBatcher is used to batch volume claim updates
|
|
volumeUpdateBatcher *VolumeUpdateBatcher
|
|
|
|
// ctx and exitFn are used to cancel the watcher
|
|
ctx context.Context
|
|
exitFn context.CancelFunc
|
|
|
|
wlock sync.RWMutex
|
|
}
|
|
|
|
// NewVolumesWatcher returns a volumes watcher that is used to watch
|
|
// volumes and trigger the scheduler as needed.
|
|
func NewVolumesWatcher(logger log.Logger,
|
|
raft VolumeRaftEndpoints, rpc ClientRPC, stateQueriesPerSecond float64,
|
|
updateBatchDuration time.Duration) *Watcher {
|
|
|
|
// the leader step-down calls SetEnabled(false) which is what
|
|
// cancels this context, rather than passing in its own shutdown
|
|
// context
|
|
ctx, exitFn := context.WithCancel(context.Background())
|
|
|
|
return &Watcher{
|
|
raft: raft,
|
|
rpc: rpc,
|
|
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
|
|
updateBatchDuration: updateBatchDuration,
|
|
logger: logger.Named("volumes_watcher"),
|
|
ctx: ctx,
|
|
exitFn: exitFn,
|
|
}
|
|
}
|
|
|
|
// SetEnabled is used to control if the watcher is enabled. The
|
|
// watcher should only be enabled on the active leader. When being
|
|
// enabled the state is passed in as it is no longer valid once a
|
|
// leader election has taken place.
|
|
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) {
|
|
w.wlock.Lock()
|
|
defer w.wlock.Unlock()
|
|
|
|
wasEnabled := w.enabled
|
|
w.enabled = enabled
|
|
|
|
if state != nil {
|
|
w.state = state
|
|
}
|
|
|
|
// Flush the state to create the necessary objects
|
|
w.flush()
|
|
|
|
// If we are starting now, launch the watch daemon
|
|
if enabled && !wasEnabled {
|
|
go w.watchVolumes(w.ctx)
|
|
}
|
|
}
|
|
|
|
// flush is used to clear the state of the watcher
|
|
func (w *Watcher) flush() {
|
|
// Stop all the watchers and clear it
|
|
for _, watcher := range w.watchers {
|
|
watcher.Stop()
|
|
}
|
|
|
|
// Kill everything associated with the watcher
|
|
if w.exitFn != nil {
|
|
w.exitFn()
|
|
}
|
|
|
|
w.watchers = make(map[string]*volumeWatcher, 32)
|
|
w.ctx, w.exitFn = context.WithCancel(context.Background())
|
|
w.volumeUpdateBatcher = NewVolumeUpdateBatcher(w.updateBatchDuration, w.raft, w.ctx)
|
|
}
|
|
|
|
// watchVolumes is the long lived go-routine that watches for volumes to
|
|
// add and remove watchers on.
|
|
func (w *Watcher) watchVolumes(ctx context.Context) {
|
|
vIndex := uint64(1)
|
|
for {
|
|
volumes, idx, err := w.getVolumes(ctx, vIndex)
|
|
if err != nil {
|
|
if err == context.Canceled {
|
|
return
|
|
}
|
|
w.logger.Error("failed to retrieve volumes", "error", err)
|
|
}
|
|
|
|
vIndex = idx // last-seen index
|
|
for _, v := range volumes {
|
|
if err := w.add(v); err != nil {
|
|
w.logger.Error("failed to track volume", "volume_id", v.ID, "error", err)
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// getVolumes retrieves all volumes blocking at the given index.
|
|
func (w *Watcher) getVolumes(ctx context.Context, minIndex uint64) ([]*structs.CSIVolume, uint64, error) {
|
|
resp, index, err := w.state.BlockingQuery(w.getVolumesImpl, minIndex, ctx)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return resp.([]*structs.CSIVolume), index, nil
|
|
}
|
|
|
|
// getVolumesImpl retrieves all volumes from the passed state store.
|
|
func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
|
|
|
|
iter, err := state.CSIVolumes(ws)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
var volumes []*structs.CSIVolume
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
volume := raw.(*structs.CSIVolume)
|
|
volumes = append(volumes, volume)
|
|
}
|
|
|
|
// Use the last index that affected the volume table
|
|
index, err := state.Index("csi_volumes")
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return volumes, index, nil
|
|
}
|
|
|
|
// add adds a volume to the watch list
|
|
func (w *Watcher) add(d *structs.CSIVolume) error {
|
|
w.wlock.Lock()
|
|
defer w.wlock.Unlock()
|
|
_, err := w.addLocked(d)
|
|
return err
|
|
}
|
|
|
|
// addLocked adds a volume to the watch list and should only be called when
|
|
// locked. Creating the volumeWatcher starts a go routine to .watch() it
|
|
func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {
|
|
// Not enabled so no-op
|
|
if !w.enabled {
|
|
return nil, nil
|
|
}
|
|
|
|
// Already watched so trigger an update for the volume
|
|
if watcher, ok := w.watchers[v.ID+v.Namespace]; ok {
|
|
watcher.Notify(v)
|
|
return nil, nil
|
|
}
|
|
|
|
watcher := newVolumeWatcher(w, v)
|
|
w.watchers[v.ID+v.Namespace] = watcher
|
|
return watcher, nil
|
|
}
|
|
|
|
// TODO: this is currently dead code; we'll call a public remove
|
|
// method on the Watcher once we have a periodic GC job
|
|
// remove stops watching a volume and should only be called when locked.
|
|
func (w *Watcher) removeLocked(volID, namespace string) {
|
|
if !w.enabled {
|
|
return
|
|
}
|
|
if watcher, ok := w.watchers[volID+namespace]; ok {
|
|
watcher.Stop()
|
|
delete(w.watchers, volID+namespace)
|
|
}
|
|
}
|
|
|
|
// updatesClaims sends the claims to the batch updater and waits for
|
|
// the results
|
|
func (w *Watcher) updateClaims(claims []structs.CSIVolumeClaimRequest) (uint64, error) {
|
|
return w.volumeUpdateBatcher.CreateUpdate(claims).Results()
|
|
}
|