allocwatcher: Add Group AllocWatcher

The Group Alloc watcher is an implementation of a PrevAllocWatcher that
can wait for multiple previous allocs before terminating.

This is to be used when running an allocation that is preempting upstream
allocations, and thus only supports being ran with a local alloc watcher.

It also currently requires all of its child watchers to correctly handle
context cancellation. Should this be a problem, it should be fairly easy
to implement a replacement using channels rather than a waitgroup.

It obeys the PrevAllocWatcher interface for convenience, but it may be
better to extract Migration capabilities into a seperate interface for
greater clarity.
This commit is contained in:
Danielle Tomlinson 2018-12-05 18:17:33 +01:00
parent 457c6eb398
commit 2cdef6a7b4
2 changed files with 249 additions and 0 deletions

View File

@ -0,0 +1,92 @@
package allocwatcher
import (
"context"
"errors"
"sync"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
)
type groupPrevAllocWatcher struct {
prevAllocs []PrevAllocWatcher
wg sync.WaitGroup
// waiting and migrating are true when alloc runner is waiting on the
// prevAllocWatcher. Writers must acquire the waitingLock and readers
// should use the helper methods IsWaiting and IsMigrating.
waiting bool
waitingLock sync.RWMutex
}
func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) (PrevAllocWatcher, error) {
for _, watcher := range watchers {
_, ok := watcher.(*localPrevAlloc)
if !ok {
return nil, errors.New("PrevAllocWatchers must all be local watchers")
}
}
return &groupPrevAllocWatcher{
prevAllocs: watchers,
}, nil
}
// Wait on the previous allocs to become terminal, exit, or, return due to
// context termination. Usage of the groupPrevAllocWatcher requires that all
// sub-watchers correctly handle context cancellation.
// We may need to adjust this to use channels rather than a wait group, if we
// wish to more strictly enforce timeouts.
func (g *groupPrevAllocWatcher) Wait(ctx context.Context) error {
g.waitingLock.Lock()
g.waiting = true
g.waitingLock.Unlock()
defer func() {
g.waitingLock.Lock()
g.waiting = false
g.waitingLock.Unlock()
}()
var merr multierror.Error
var errmu sync.Mutex
g.wg.Add(len(g.prevAllocs))
for _, alloc := range g.prevAllocs {
go func(ctx context.Context, alloc PrevAllocWatcher) {
defer g.wg.Done()
err := alloc.Wait(ctx)
if err != nil {
errmu.Lock()
merr.Errors = append(merr.Errors, err)
errmu.Unlock()
}
}(ctx, alloc)
}
g.wg.Wait()
// Check ctx.Err first, to avoid returning an mErr of ctx.Err from prevAlloc
// Wait routines.
if err := ctx.Err(); err != nil {
return err
}
return merr.ErrorOrNil()
}
func (g *groupPrevAllocWatcher) Migrate(ctx context.Context, dest *allocdir.AllocDir) error {
return errors.New("Migration unimplemented for a groupPrevAllocWatcher")
}
func (g *groupPrevAllocWatcher) IsWaiting() bool {
g.waitingLock.RLock()
defer g.waitingLock.RUnlock()
return g.waiting
}
func (g *groupPrevAllocWatcher) IsMigrating() bool {
return false
}

View File

@ -0,0 +1,157 @@
package allocwatcher
import (
"context"
"fmt"
"testing"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
// TestPrevAlloc_GroupPrevAllocWatcher_Block asserts that when there are
// prevAllocs is set a groupPrevAllocWatcher will block on them
func TestPrevAlloc_GroupPrevAllocWatcher_Block(t *testing.T) {
t.Parallel()
conf, cleanup := newConfig(t)
defer cleanup()
conf.Alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
waiter := NewAllocWatcher(conf)
groupWaiter := &groupPrevAllocWatcher{prevAllocs: []PrevAllocWatcher{waiter}}
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer cancel()
groupWaiter.Wait(ctx)
}()
// Assert watcher is waiting
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Broadcast a non-terminal alloc update to assert only terminal
// updates break out of waiting.
update := conf.PreviousRunner.Alloc().Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ModifyIndex++
update.AllocModifyIndex++
broadcaster := conf.PreviousRunner.(*fakeAllocRunner).Broadcaster
err := broadcaster.Send(update)
require.NoError(t, err)
// Assert watcher is still waiting because alloc isn't terminal
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Stop the previous alloc and assert watcher stops blocking
update = update.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ClientStatus = structs.AllocClientStatusComplete
update.ModifyIndex++
update.AllocModifyIndex++
err = broadcaster.Send(update)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
if groupWaiter.IsWaiting() {
return false, fmt.Errorf("did not expect watcher to be waiting")
}
return !groupWaiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating")
}, func(err error) {
t.Fatalf("error: %v", err)
})
}
// TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti asserts that when there are
// multiple prevAllocs is set a groupPrevAllocWatcher will block until all
// are complete
func TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti(t *testing.T) {
t.Parallel()
conf1, cleanup1 := newConfig(t)
defer cleanup1()
conf1.Alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
conf2, cleanup2 := newConfig(t)
defer cleanup2()
conf2.Alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "500ms",
}
waiter1 := NewAllocWatcher(conf1)
waiter2 := NewAllocWatcher(conf2)
groupWaiter := &groupPrevAllocWatcher{
prevAllocs: []PrevAllocWatcher{
waiter1,
waiter2,
},
}
terminalBroadcastFn := func(cfg Config) {
update := cfg.PreviousRunner.Alloc().Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
update.ClientStatus = structs.AllocClientStatusComplete
update.ModifyIndex++
update.AllocModifyIndex++
broadcaster := cfg.PreviousRunner.(*fakeAllocRunner).Broadcaster
err := broadcaster.Send(update)
require.NoError(t, err)
}
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer cancel()
groupWaiter.Wait(ctx)
}()
// Assert watcher is waiting
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Broadcast a terminal alloc update to the first watcher
terminalBroadcastFn(conf1)
// Assert watcher is still waiting because alloc isn't terminal
testutil.WaitForResult(func() (bool, error) {
return groupWaiter.IsWaiting(), fmt.Errorf("expected watcher to be waiting")
}, func(err error) {
t.Fatalf("error: %v", err)
})
// Broadcast a terminal alloc update to the second watcher
terminalBroadcastFn(conf2)
testutil.WaitForResult(func() (bool, error) {
if groupWaiter.IsWaiting() {
return false, fmt.Errorf("did not expect watcher to be waiting")
}
return !groupWaiter.IsMigrating(), fmt.Errorf("did not expect watcher to be migrating")
}, func(err error) {
t.Fatalf("error: %v", err)
})
}