open-nomad/nomad/volumewatcher/volumes_watcher.go

233 lines
6.3 KiB
Go
Raw Normal View History

package volumewatcher
import (
"context"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/time/rate"
)
const (
// LimitStateQueriesPerSecond is the number of state queries allowed per
// second
LimitStateQueriesPerSecond = 100.0
// CrossVolumeUpdateBatchDuration is the duration in which volume
// claim updates are batched across all volume watchers before
// being committed to Raft.
CrossVolumeUpdateBatchDuration = 250 * time.Millisecond
)
// Watcher is used to watch volumes and their allocations created
// by the scheduler and trigger the scheduler when allocation health
// transitions.
type Watcher struct {
enabled bool
logger log.Logger
// queryLimiter is used to limit the rate of blocking queries
queryLimiter *rate.Limiter
// updateBatchDuration is the duration in which volume
// claim updates are batched across all volume watchers
// before being committed to Raft.
updateBatchDuration time.Duration
// raft contains the set of Raft endpoints that can be used by the
// volumes watcher
raft VolumeRaftEndpoints
// rpc contains the set of Server methods that can be used by
// the volumes watcher for RPC
rpc ClientRPC
// state is the state that is watched for state changes.
state *state.StateStore
// watchers is the set of active watchers, one per volume
watchers map[string]*volumeWatcher
// volumeUpdateBatcher is used to batch volume claim updates
volumeUpdateBatcher *VolumeUpdateBatcher
// ctx and exitFn are used to cancel the watcher
ctx context.Context
exitFn context.CancelFunc
wlock sync.RWMutex
}
// NewVolumesWatcher returns a volumes watcher that is used to watch
// volumes and trigger the scheduler as needed.
func NewVolumesWatcher(logger log.Logger,
raft VolumeRaftEndpoints, rpc ClientRPC, stateQueriesPerSecond float64,
updateBatchDuration time.Duration) *Watcher {
// the leader step-down calls SetEnabled(false) which is what
// cancels this context, rather than passing in its own shutdown
// context
ctx, exitFn := context.WithCancel(context.Background())
return &Watcher{
raft: raft,
rpc: rpc,
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
updateBatchDuration: updateBatchDuration,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
}
}
// SetEnabled is used to control if the watcher is enabled. The
// watcher should only be enabled on the active leader. When being
// enabled the state is passed in as it is no longer valid once a
// leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) {
w.wlock.Lock()
defer w.wlock.Unlock()
wasEnabled := w.enabled
w.enabled = enabled
if state != nil {
w.state = state
}
// Flush the state to create the necessary objects
w.flush()
// If we are starting now, launch the watch daemon
if enabled && !wasEnabled {
go w.watchVolumes(w.ctx)
}
}
// flush is used to clear the state of the watcher
func (w *Watcher) flush() {
// Stop all the watchers and clear it
for _, watcher := range w.watchers {
watcher.Stop()
}
// Kill everything associated with the watcher
if w.exitFn != nil {
w.exitFn()
}
w.watchers = make(map[string]*volumeWatcher, 32)
w.ctx, w.exitFn = context.WithCancel(context.Background())
w.volumeUpdateBatcher = NewVolumeUpdateBatcher(w.updateBatchDuration, w.raft, w.ctx)
}
// watchVolumes is the long lived go-routine that watches for volumes to
// add and remove watchers on.
func (w *Watcher) watchVolumes(ctx context.Context) {
vIndex := uint64(1)
for {
volumes, idx, err := w.getVolumes(ctx, vIndex)
if err != nil {
if err == context.Canceled {
return
}
w.logger.Error("failed to retrieve volumes", "error", err)
}
vIndex = idx // last-seen index
for _, v := range volumes {
if err := w.add(v); err != nil {
w.logger.Error("failed to track volume", "volume_id", v.ID, "error", err)
}
}
}
}
// getVolumes retrieves all volumes blocking at the given index.
func (w *Watcher) getVolumes(ctx context.Context, minIndex uint64) ([]*structs.CSIVolume, uint64, error) {
resp, index, err := w.state.BlockingQuery(w.getVolumesImpl, minIndex, ctx)
if err != nil {
return nil, 0, err
}
return resp.([]*structs.CSIVolume), index, nil
}
// getVolumesImpl retrieves all volumes from the passed state store.
func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.CSIVolumes(ws)
if err != nil {
return nil, 0, err
}
var volumes []*structs.CSIVolume
for {
raw := iter.Next()
if raw == nil {
break
}
volume := raw.(*structs.CSIVolume)
volumes = append(volumes, volume)
}
// Use the last index that affected the volume table
index, err := state.Index("csi_volumes")
if err != nil {
return nil, 0, err
}
return volumes, index, nil
}
// add adds a volume to the watch list
func (w *Watcher) add(d *structs.CSIVolume) error {
w.wlock.Lock()
defer w.wlock.Unlock()
_, err := w.addLocked(d)
return err
}
// addLocked adds a volume to the watch list and should only be called when
// locked. Creating the volumeWatcher starts a go routine to .watch() it
func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {
// Not enabled so no-op
if !w.enabled {
return nil, nil
}
// Already watched so trigger an update for the volume
if watcher, ok := w.watchers[v.ID+v.Namespace]; ok {
watcher.Notify(v)
return nil, nil
}
watcher := newVolumeWatcher(w, v)
w.watchers[v.ID+v.Namespace] = watcher
return watcher, nil
}
// TODO: this is currently dead code; we'll call a public remove
// method on the Watcher once we have a periodic GC job
// remove stops watching a volume and should only be called when locked.
func (w *Watcher) removeLocked(volID, namespace string) {
if !w.enabled {
return
}
if watcher, ok := w.watchers[volID+namespace]; ok {
watcher.Stop()
delete(w.watchers, volID+namespace)
}
}
// updatesClaims sends the claims to the batch updater and waits for
// the results
func (w *Watcher) updateClaims(claims []structs.CSIVolumeClaimRequest) (uint64, error) {
return w.volumeUpdateBatcher.CreateUpdate(claims).Results()
}