Merge pull request #4963 from hashicorp/dani/f-preempt-alloc-wait

client: Wait for preemptions to terminate
This commit is contained in:
Danielle Tomlinson 2018-12-11 18:06:34 +01:00 committed by GitHub
commit d11c62fa3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 392 additions and 52 deletions

View file

@ -98,10 +98,13 @@ type allocRunner struct {
// allocBroadcaster sends client allocation updates to all listeners
allocBroadcaster *cstructs.AllocBroadcaster
// prevAllocWatcher allows waiting for a previous allocation to exit
// and if necessary migrate its alloc dir.
// prevAllocWatcher allows waiting for any previous or preempted allocations
// to exit
prevAllocWatcher allocwatcher.PrevAllocWatcher
// prevAllocMigrator allows the migration of a previous allocations alloc dir.
prevAllocMigrator allocwatcher.PrevAllocMigrator
// pluginSingletonLoader is a plugin loader that will returns singleton
// instances of the plugins.
pluginSingletonLoader loader.PluginCatalog
@ -134,6 +137,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
taskStateUpdateHandlerCh: make(chan struct{}),
deviceStatsReporter: config.DeviceStatsReporter,
prevAllocWatcher: config.PrevAllocWatcher,
prevAllocMigrator: config.PrevAllocMigrator,
pluginSingletonLoader: config.PluginSingletonLoader,
devicemanager: config.DeviceManager,
}
@ -713,7 +717,7 @@ func (ar *allocRunner) Shutdown() {
//
// This method is safe for calling concurrently with Run().
func (ar *allocRunner) IsMigrating() bool {
return ar.prevAllocWatcher.IsMigrating()
return ar.prevAllocMigrator.IsMigrating()
}
func (ar *allocRunner) StatsReporter() interfaces.AllocStatsReporter {

View file

@ -77,7 +77,8 @@ func (ar *allocRunner) initRunnerHooks() {
// directory path exists for other hooks.
ar.runnerHooks = []interfaces.RunnerHook{
newAllocDirHook(hookLogger, ar.allocDir),
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, ar.Alloc(), hs, ar.Listener(), ar.consulClient),
}
}

View file

@ -36,13 +36,15 @@ type Config struct {
// StateUpdater is used to emit updated task state
StateUpdater interfaces.AllocStateHandler
// deviceStatsReporter is used to lookup resource usage for alloc devices
// DeviceStatsReporter is used to lookup resource usage for alloc devices
DeviceStatsReporter interfaces.DeviceStatsReporter
// PrevAllocWatcher handles waiting on previous allocations and
// migrating their ephemeral disk when necessary.
// PrevAllocWatcher handles waiting on previous or preempted allocations
PrevAllocWatcher allocwatcher.PrevAllocWatcher
// PrevAllocMigrator allows the migration of a previous allocations alloc dir
PrevAllocMigrator allocwatcher.PrevAllocMigrator
// PluginLoader is used to load plugins.
PluginLoader loader.PluginCatalog

View file

@ -13,11 +13,11 @@ import (
// being built but must be run before anything else manipulates the alloc dir.
type diskMigrationHook struct {
allocDir *allocdir.AllocDir
allocWatcher allocwatcher.PrevAllocWatcher
allocWatcher allocwatcher.PrevAllocMigrator
logger log.Logger
}
func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher, allocDir *allocdir.AllocDir) *diskMigrationHook {
func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocMigrator, allocDir *allocdir.AllocDir) *diskMigrationHook {
h := &diskMigrationHook{
allocDir: allocDir,
allocWatcher: allocWatcher,

View file

@ -63,6 +63,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
PluginSingletonLoader: singleton.NewSingletonLoader(clientConf.Logger, pluginLoader),
DeviceManager: devicemanager.NoopMockManager(),
}

View file

@ -0,0 +1,32 @@
package allocrunner
import (
"context"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocwatcher"
)
// upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing
// an allocation to be executed
type upstreamAllocsHook struct {
allocWatcher allocwatcher.PrevAllocWatcher
logger log.Logger
}
func newUpstreamAllocsHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *upstreamAllocsHook {
h := &upstreamAllocsHook{
allocWatcher: allocWatcher,
}
h.logger = logger.Named(h.Name())
return h
}
func (h *upstreamAllocsHook) Name() string {
return "await_previous_allocations"
}
func (h *upstreamAllocsHook) Prerun(ctx context.Context) error {
// Wait for a previous alloc - if any - to terminate
return h.allocWatcher.Wait(ctx)
}

View file

@ -47,20 +47,26 @@ type AllocRunnerMeta interface {
}
// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to
// terminate and migrate its data whether or not the previous allocation is
// local or remote.
// terminate whether or not the previous allocation is local or remote.
// See `PrevAllocMigrator` for migrating workloads.
type PrevAllocWatcher interface {
// Wait for previous alloc to terminate
Wait(context.Context) error
// Migrate data from previous alloc
Migrate(ctx context.Context, dest *allocdir.AllocDir) error
// IsWaiting returns true if a concurrent caller is blocked in Wait
IsWaiting() bool
}
// PrevAllocMigrator allows AllocRunners to migrate a previous allocation
// whether or not the previous allocation is local or remote.
type PrevAllocMigrator interface {
PrevAllocWatcher
// IsMigrating returns true if a concurrent caller is in Migrate
IsMigrating() bool
// Migrate data from previous alloc
Migrate(ctx context.Context, dest *allocdir.AllocDir) error
}
type Config struct {
@ -68,10 +74,13 @@ type Config struct {
// previous allocation stopping.
Alloc *structs.Allocation
// PreviousRunner is non-nil iff All has a PreviousAllocation and it is
// PreviousRunner is non-nil if Alloc has a PreviousAllocation and it is
// running locally.
PreviousRunner AllocRunnerMeta
// PreemptedRunners is non-nil if Alloc has one or more PreemptedAllocations.
PreemptedRunners map[string]AllocRunnerMeta
// RPC allows the alloc watcher to monitor remote allocations.
RPC RPCer
@ -85,31 +94,23 @@ type Config struct {
Logger hclog.Logger
}
// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func NewAllocWatcher(c Config) PrevAllocWatcher {
if c.Alloc.PreviousAllocation == "" {
// No previous allocation, use noop transitioner
return NoopPrevAlloc{}
}
func newMigratorForAlloc(c Config, tg *structs.TaskGroup, watchedAllocID string, m AllocRunnerMeta) PrevAllocMigrator {
logger := c.Logger.Named("alloc_migrator").With("alloc_id", c.Alloc.ID).With("previous_alloc", watchedAllocID)
logger := c.Logger.Named("alloc_watcher")
logger = logger.With("alloc_id", c.Alloc.ID)
logger = logger.With("previous_alloc", c.Alloc.PreviousAllocation)
tasks := tg.Tasks
sticky := tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky
migrate := tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate
tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup)
if c.PreviousRunner != nil {
// Previous allocation is local, use local transitioner
if m != nil {
// Local Allocation because there's no meta
return &localPrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
tasks: tg.Tasks,
sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky,
prevAllocDir: c.PreviousRunner.GetAllocDir(),
prevListener: c.PreviousRunner.Listener(),
prevStatus: c.PreviousRunner.Alloc(),
prevAllocID: watchedAllocID,
tasks: tasks,
sticky: sticky,
prevAllocDir: m.GetAllocDir(),
prevListener: m.Listener(),
prevStatus: m.Alloc(),
logger: logger,
}
}
@ -117,15 +118,75 @@ func NewAllocWatcher(c Config) PrevAllocWatcher {
return &remotePrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
tasks: tg.Tasks,
tasks: tasks,
config: c.Config,
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
migrate: migrate,
rpc: c.RPC,
migrateToken: c.MigrateToken,
logger: logger,
}
}
func newWatcherForAlloc(c Config, watchedAllocID string, m AllocRunnerMeta) PrevAllocWatcher {
logger := c.Logger.Named("alloc_watcher").With("alloc_id", c.Alloc.ID).With("previous_alloc", watchedAllocID)
if m != nil {
// Local Allocation because there's no meta
return &localPrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: watchedAllocID,
prevAllocDir: m.GetAllocDir(),
prevListener: m.Listener(),
prevStatus: m.Alloc(),
logger: logger,
}
}
return &remotePrevAlloc{
allocID: c.Alloc.ID,
prevAllocID: c.Alloc.PreviousAllocation,
config: c.Config,
rpc: c.RPC,
migrateToken: c.MigrateToken,
logger: logger,
}
}
// NewAllocWatcher creates a PrevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func NewAllocWatcher(c Config) (PrevAllocWatcher, PrevAllocMigrator) {
if c.Alloc.PreviousAllocation == "" && c.PreemptedRunners == nil {
return NoopPrevAlloc{}, NoopPrevAlloc{}
}
var prevAllocWatchers []PrevAllocWatcher
var prevAllocMigrator PrevAllocMigrator = NoopPrevAlloc{}
// We have a previous allocation, add its listener to the watchers, and
// use a migrator.
if c.Alloc.PreviousAllocation != "" {
tg := c.Alloc.Job.LookupTaskGroup(c.Alloc.TaskGroup)
m := newMigratorForAlloc(c, tg, c.Alloc.PreviousAllocation, c.PreviousRunner)
prevAllocWatchers = append(prevAllocWatchers, m)
prevAllocMigrator = m
}
// We are preempting allocations, add their listeners to the watchers.
if c.PreemptedRunners != nil {
for aid, r := range c.PreemptedRunners {
w := newWatcherForAlloc(c, aid, r)
prevAllocWatchers = append(prevAllocWatchers, w)
}
}
groupWatcher := &groupPrevAllocWatcher{
prevAllocs: prevAllocWatchers,
}
return groupWatcher, prevAllocMigrator
}
// localPrevAlloc is a prevAllocWatcher for previous allocations on the same
// node as an updated allocation.
type localPrevAlloc struct {

View file

@ -97,20 +97,20 @@ func TestPrevAlloc_Noop(t *testing.T) {
conf.Alloc.PreviousAllocation = ""
watcher := NewAllocWatcher(conf)
watcher, migrator := NewAllocWatcher(conf)
require.NotNil(t, watcher)
_, ok := watcher.(NoopPrevAlloc)
require.True(t, ok, "expected watcher to be NoopPrevAlloc")
_, ok := migrator.(NoopPrevAlloc)
require.True(t, ok, "expected migrator to be NoopPrevAlloc")
done := make(chan int, 2)
go func() {
watcher.Wait(context.Background())
done <- 1
watcher.Migrate(context.Background(), nil)
migrator.Migrate(context.Background(), nil)
done <- 1
}()
require.False(t, watcher.IsWaiting())
require.False(t, watcher.IsMigrating())
require.False(t, migrator.IsMigrating())
<-done
<-done
}
@ -127,7 +127,7 @@ func TestPrevAlloc_LocalPrevAlloc_Block(t *testing.T) {
"run_for": "500ms",
}
waiter := NewAllocWatcher(conf)
_, waiter := NewAllocWatcher(conf)
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
@ -191,7 +191,7 @@ func TestPrevAlloc_LocalPrevAlloc_Terminated(t *testing.T) {
conf.PreviousRunner.Alloc().ClientStatus = structs.AllocClientStatusComplete
waiter := NewAllocWatcher(conf)
waiter, _ := NewAllocWatcher(conf)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

View file

@ -0,0 +1,75 @@
package allocwatcher
import (
"context"
"sync"
multierror "github.com/hashicorp/go-multierror"
)
type groupPrevAllocWatcher struct {
prevAllocs []PrevAllocWatcher
wg sync.WaitGroup
// waiting and migrating are true when alloc runner is waiting on the
// prevAllocWatcher. Writers must acquire the waitingLock and readers
// should use the helper methods IsWaiting and IsMigrating.
waiting bool
waitingLock sync.RWMutex
}
func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) PrevAllocWatcher {
return &groupPrevAllocWatcher{
prevAllocs: watchers,
}
}
// Wait on the previous allocs to become terminal, exit, or, return due to
// context termination. Usage of the groupPrevAllocWatcher requires that all
// sub-watchers correctly handle context cancellation.
// We may need to adjust this to use channels rather than a wait group, if we
// wish to more strictly enforce timeouts.
func (g *groupPrevAllocWatcher) Wait(ctx context.Context) error {
g.waitingLock.Lock()
g.waiting = true
g.waitingLock.Unlock()
defer func() {
g.waitingLock.Lock()
g.waiting = false
g.waitingLock.Unlock()
}()
var merr multierror.Error
var errmu sync.Mutex
g.wg.Add(len(g.prevAllocs))
for _, alloc := range g.prevAllocs {
go func(ctx context.Context, alloc PrevAllocWatcher) {
defer g.wg.Done()
err := alloc.Wait(ctx)
if err != nil {
errmu.Lock()
merr.Errors = append(merr.Errors, err)
errmu.Unlock()
}
}(ctx, alloc)
}
g.wg.Wait()
// Check ctx.Err first, to avoid returning an mErr of ctx.Err from prevAlloc
// Wait routines.
if err := ctx.Err(); err != nil {
return err
}
return merr.ErrorOrNil()
}
func (g *groupPrevAllocWatcher) IsWaiting() bool {
g.waitingLock.RLock()
defer g.waitingLock.RUnlock()
return g.waiting
}

View file

@ -0,0 +1,151 @@
package allocwatcher
import (
"context"
"fmt"
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
// TestPrevAlloc_GroupPrevAllocWatcher_Block asserts that when there are
// prevAllocs is set a groupPrevAllocWatcher will block on them
func TestPrevAlloc_GroupPrevAllocWatcher_Block(t *testing.T) {
t.Parallel()
conf, cleanup := newConfig(t)
defer cleanup()
conf.Alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
waiter, _ := NewAllocWatcher(conf)
groupWaiter := &groupPrevAllocWatcher{prevAllocs: []PrevAllocWatcher{waiter}}
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer cancel()
groupWaiter.Wait(ctx)
}()
// Assert watcher is waiting
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Broadcast a non-terminal alloc update to assert only terminal
// updates break out of waiting.
update := conf.PreviousRunner.Alloc().Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ModifyIndex++
update.AllocModifyIndex++
broadcaster := conf.PreviousRunner.(*fakeAllocRunner).Broadcaster
err := broadcaster.Send(update)
require.NoError(t, err)
// Assert watcher is still waiting because alloc isn't terminal
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Stop the previous alloc and assert watcher stops blocking
update = update.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ClientStatus = structs.AllocClientStatusComplete
update.ModifyIndex++
update.AllocModifyIndex++
err = broadcaster.Send(update)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
return !groupWaiter.IsWaiting(), fmt.Errorf("did not expect watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
}
// TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti asserts that when there are
// multiple prevAllocs is set a groupPrevAllocWatcher will block until all
// are complete
func TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti(t *testing.T) {
t.Parallel()
conf1, cleanup1 := newConfig(t)
defer cleanup1()
conf1.Alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
conf2, cleanup2 := newConfig(t)
defer cleanup2()
conf2.Alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
waiter1, _ := NewAllocWatcher(conf1)
waiter2, _ := NewAllocWatcher(conf2)
groupWaiter := &groupPrevAllocWatcher{
prevAllocs: []PrevAllocWatcher{
waiter1,
waiter2,
},
}
terminalBroadcastFn := func(cfg Config) {
update := cfg.PreviousRunner.Alloc().Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ClientStatus = structs.AllocClientStatusComplete
update.ModifyIndex++
update.AllocModifyIndex++
broadcaster := cfg.PreviousRunner.(*fakeAllocRunner).Broadcaster
err := broadcaster.Send(update)
require.NoError(t, err)
}
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer cancel()
groupWaiter.Wait(ctx)
}()
// Assert watcher is waiting
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Broadcast a terminal alloc update to the first watcher
terminalBroadcastFn(conf1)
// Assert watcher is still waiting because alloc isn't terminal
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Broadcast a terminal alloc update to the second watcher
terminalBroadcastFn(conf2)
testutil.WaitForResult(func() (bool, error) {
return !groupWaiter.IsWaiting(), fmt.Errorf("did not expect watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
}

View file

@ -861,6 +861,7 @@ func (c *Client) restoreState() error {
// we need the local AllocRunners initialized first. We could
// add a second loop to initialize just the alloc watcher.
prevAllocWatcher := allocwatcher.NoopPrevAlloc{}
prevAllocMigrator := allocwatcher.NoopPrevAlloc{}
c.configLock.RLock()
arConf := &allocrunner.Config{
@ -873,6 +874,7 @@ func (c *Client) restoreState() error {
Consul: c.consulService,
Vault: c.vaultClient,
PrevAllocWatcher: prevAllocWatcher,
PrevAllocMigrator: prevAllocMigrator,
PluginLoader: c.config.PluginLoader,
PluginSingletonLoader: c.config.PluginSingletonLoader,
DeviceManager: c.devicemanager,
@ -2028,17 +2030,27 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
return err
}
// Collect any preempted allocations to pass into the previous alloc watcher
var preemptedAllocs map[string]allocwatcher.AllocRunnerMeta
if len(alloc.PreemptedAllocations) > 0 {
preemptedAllocs = make(map[string]allocwatcher.AllocRunnerMeta)
for _, palloc := range alloc.PreemptedAllocations {
preemptedAllocs[palloc] = c.allocs[palloc]
}
}
// Since only the Client has access to other AllocRunners and the RPC
// client, create the previous allocation watcher here.
watcherConfig := allocwatcher.Config{
Alloc: alloc,
PreviousRunner: c.allocs[alloc.PreviousAllocation],
RPC: c,
Config: c.configCopy,
MigrateToken: migrateToken,
Logger: c.logger,
Alloc: alloc,
PreviousRunner: c.allocs[alloc.PreviousAllocation],
PreemptedRunners: preemptedAllocs,
RPC: c,
Config: c.configCopy,
MigrateToken: migrateToken,
Logger: c.logger,
}
prevAllocWatcher := allocwatcher.NewAllocWatcher(watcherConfig)
prevAllocWatcher, prevAllocMigrator := allocwatcher.NewAllocWatcher(watcherConfig)
// Copy the config since the node can be swapped out as it is being updated.
// The long term fix is to pass in the config and node separately and then
@ -2054,6 +2066,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
StateUpdater: c,
DeviceStatsReporter: c,
PrevAllocWatcher: prevAllocWatcher,
PrevAllocMigrator: prevAllocMigrator,
PluginLoader: c.config.PluginLoader,
PluginSingletonLoader: c.config.PluginSingletonLoader,
DeviceManager: c.devicemanager,