client: Wait for preempted allocs to terminate
When starting an allocation that is preempting other allocs, we create a new group allocation watcher, and then wait for the allocations to terminate in the allocation PreRun hooks. If there's no preempted allocations, then we simply provide a NoopAllocWatcher.
This commit is contained in:
parent
2cdef6a7b4
commit
dff7093243
|
@ -102,6 +102,9 @@ type allocRunner struct {
|
|||
// and if necessary migrate its alloc dir.
|
||||
prevAllocWatcher allocwatcher.PrevAllocWatcher
|
||||
|
||||
// preemptedAllocWatcher allows waiting for preempted allocations to exit
|
||||
preemptedAllocWatcher allocwatcher.PrevAllocWatcher
|
||||
|
||||
// pluginSingletonLoader is a plugin loader that will returns singleton
|
||||
// instances of the plugins.
|
||||
pluginSingletonLoader loader.PluginCatalog
|
||||
|
@ -134,6 +137,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
|||
taskStateUpdateHandlerCh: make(chan struct{}),
|
||||
deviceStatsReporter: config.DeviceStatsReporter,
|
||||
prevAllocWatcher: config.PrevAllocWatcher,
|
||||
preemptedAllocWatcher: config.PreemptedAllocWatcher,
|
||||
pluginSingletonLoader: config.PluginSingletonLoader,
|
||||
devicemanager: config.DeviceManager,
|
||||
}
|
||||
|
@ -668,7 +672,7 @@ func (ar *allocRunner) IsDestroyed() bool {
|
|||
//
|
||||
// This method is safe for calling concurrently with Run().
|
||||
func (ar *allocRunner) IsWaiting() bool {
|
||||
return ar.prevAllocWatcher.IsWaiting()
|
||||
return ar.preemptedAllocWatcher.IsWaiting() || ar.prevAllocWatcher.IsWaiting()
|
||||
}
|
||||
|
||||
// Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners.
|
||||
|
|
|
@ -77,6 +77,7 @@ func (ar *allocRunner) initRunnerHooks() {
|
|||
// directory path exists for other hooks.
|
||||
ar.runnerHooks = []interfaces.RunnerHook{
|
||||
newAllocDirHook(hookLogger, ar.allocDir),
|
||||
newPreemptionHook(hookLogger, ar.preemptedAllocWatcher),
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
|
||||
newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient),
|
||||
}
|
||||
|
|
|
@ -43,6 +43,9 @@ type Config struct {
|
|||
// migrating their ephemeral disk when necessary.
|
||||
PrevAllocWatcher allocwatcher.PrevAllocWatcher
|
||||
|
||||
// PreemptedAllocWatcher allows waiting for preempted allocations to exit
|
||||
PreemptedAllocWatcher allocwatcher.PrevAllocWatcher
|
||||
|
||||
// PluginLoader is used to load plugins.
|
||||
PluginLoader loader.PluginCatalog
|
||||
|
||||
|
|
32
client/allocrunner/preemption_hook.go
Normal file
32
client/allocrunner/preemption_hook.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package allocrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocwatcher"
|
||||
)
|
||||
|
||||
// preemptionWatchingHook waits for a PrevAllocWatcher to exit before allowing
|
||||
// an allocation to be executed
|
||||
type preemptionWatchingHook struct {
|
||||
allocWatcher allocwatcher.PrevAllocWatcher
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newPreemptionHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *preemptionWatchingHook {
|
||||
h := &preemptionWatchingHook{
|
||||
allocWatcher: allocWatcher,
|
||||
}
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *preemptionWatchingHook) Name() string {
|
||||
return "await_preemptions"
|
||||
}
|
||||
|
||||
func (h *preemptionWatchingHook) Prerun(ctx context.Context) error {
|
||||
// Wait for a previous alloc - if any - to terminate
|
||||
return h.allocWatcher.Wait(ctx)
|
||||
}
|
|
@ -2040,6 +2040,28 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
|||
}
|
||||
prevAllocWatcher := allocwatcher.NewAllocWatcher(watcherConfig)
|
||||
|
||||
var preemptedAllocWatchers []allocwatcher.PrevAllocWatcher
|
||||
for _, palloc := range alloc.PreemptedAllocations {
|
||||
cfg := allocwatcher.Config{
|
||||
Alloc: alloc,
|
||||
PreviousRunner: c.allocs[palloc],
|
||||
RPC: c,
|
||||
Config: c.configCopy,
|
||||
Logger: c.logger,
|
||||
}
|
||||
w := allocwatcher.NewAllocWatcher(cfg)
|
||||
preemptedAllocWatchers = append(preemptedAllocWatchers, w)
|
||||
}
|
||||
|
||||
var preemptedAllocWatcher allocwatcher.PrevAllocWatcher = allocwatcher.NoopPrevAlloc{}
|
||||
if len(preemptedAllocWatchers) > 0 {
|
||||
var err error
|
||||
preemptedAllocWatcher, err = allocwatcher.NewGroupAllocWatcher(preemptedAllocWatchers...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Copy the config since the node can be swapped out as it is being updated.
|
||||
// The long term fix is to pass in the config and node separately and then
|
||||
// we don't have to do a copy.
|
||||
|
@ -2054,6 +2076,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
|
|||
StateUpdater: c,
|
||||
DeviceStatsReporter: c,
|
||||
PrevAllocWatcher: prevAllocWatcher,
|
||||
PreemptedAllocWatcher: preemptedAllocWatcher,
|
||||
PluginLoader: c.config.PluginLoader,
|
||||
PluginSingletonLoader: c.config.PluginSingletonLoader,
|
||||
DeviceManager: c.devicemanager,
|
||||
|
|
Loading…
Reference in a new issue