blocking chained allocations until previous allocation hasn't terminated

This commit is contained in:
Diptanu Choudhury 2016-08-22 11:34:24 -05:00
parent f814e13e35
commit 4ca623bcfe
7 changed files with 349 additions and 7 deletions

View File

@ -128,6 +128,11 @@ type Client struct {
allocs map[string]*AllocRunner
allocLock sync.RWMutex
// blockedAllocations are allocations which are blocked because their
// chained allocations haven't finished running
blockedAllocations map[string]*structs.Allocation
blockedAllocsLock sync.RWMutex
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
@ -155,6 +160,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(),
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
}
@ -966,6 +972,18 @@ func (c *Client) allocSync() {
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc
// If this alloc was blocking another alloc and transitioned to a
// terminal state then start the blocked allocation
c.blockedAllocsLock.Lock()
if blockedAlloc, ok := c.blockedAllocations[alloc.ID]; ok && alloc.Terminated() {
if err := c.addAlloc(blockedAlloc); err != nil {
c.logger.Printf("[ERR] client: failed to add alloc '%s': %v",
blockedAlloc.ID, err)
}
delete(c.blockedAllocations, blockedAlloc.PreviousAllocation)
}
c.blockedAllocsLock.Unlock()
case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
@ -1191,6 +1209,16 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Start the new allocations
for _, add := range diff.added {
// If the allocation is chanined and the previous allocation hasn't
// terminated yet, then add the alloc to the blocked queue.
if ar, ok := c.getAllocRunners()[add.PreviousAllocation]; ok && !ar.Alloc().Terminated() {
c.logger.Printf("[DEBUG] client: added alloc %q to blocked queue", add.ID)
c.blockedAllocsLock.Lock()
c.blockedAllocations[add.PreviousAllocation] = add
c.blockedAllocsLock.Unlock()
continue
}
if err := c.addAlloc(add); err != nil {
c.logger.Printf("[ERR] client: failed to add alloc '%s': %v",
add.ID, err)

View File

@ -598,3 +598,96 @@ func TestClient_Init(t *testing.T) {
t.Fatalf("err: %s", err)
}
}
func TestClient_BlockedAllocations(t *testing.T) {
s1, _ := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
c1 := testClient(t, func(c *config.Config) {
c.RPCHandler = s1
})
// Wait for the node to be ready
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
out, err := state.NodeByID(c1.Node().ID)
if err != nil {
return false, err
}
if out == nil || out.Status != structs.NodeStatusReady {
return false, fmt.Errorf("bad node: %#v", out)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Add an allocation
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"kill_after": "1s",
"run_for": "100s",
"exit_code": 0,
"exit_signal": 0,
"exit_err": "",
}
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(100, []*structs.Allocation{alloc})
// Wait until the client downloads and starts the allocation
testutil.WaitForResult(func() (bool, error) {
out, err := state.AllocByID(alloc.ID)
if err != nil {
return false, err
}
if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("bad alloc: %#v", out)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Add a new chained alloc
alloc2 := alloc.Copy()
alloc2.ID = structs.GenerateUUID()
alloc2.Job = alloc.Job
alloc2.JobID = alloc.JobID
alloc2.PreviousAllocation = alloc.ID
if err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
// Enusre that the chained allocation is being tracked as blocked
testutil.WaitForResult(func() (bool, error) {
alloc, ok := c1.blockedAllocations[alloc2.PreviousAllocation]
if ok && alloc.ID == alloc2.ID {
return true, nil
}
return false, fmt.Errorf("no blocked allocations")
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Change the desired state of the parent alloc to stop
alloc1 := alloc.Copy()
alloc1.DesiredStatus = structs.AllocDesiredStatusStop
if err := state.UpsertAllocs(300, []*structs.Allocation{alloc1}); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure that there are no blocked allocations
testutil.WaitForResult(func() (bool, error) {
_, ok := c1.blockedAllocations[alloc2.PreviousAllocation]
if ok {
return false, fmt.Errorf("blocked evals present")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

View File

@ -19,12 +19,13 @@ import (
// BuiltinDrivers contains the built in registered drivers
// which are available for allocation handling
var BuiltinDrivers = map[string]Factory{
"docker": NewDockerDriver,
"exec": NewExecDriver,
"raw_exec": NewRawExecDriver,
"java": NewJavaDriver,
"qemu": NewQemuDriver,
"rkt": NewRktDriver,
"docker": NewDockerDriver,
"exec": NewExecDriver,
"raw_exec": NewRawExecDriver,
"java": NewJavaDriver,
"qemu": NewQemuDriver,
"rkt": NewRktDriver,
"mock_driver": NewMockDriver,
}
// NewDriver is used to instantiate and return a new driver

View File

@ -0,0 +1,168 @@
package driver
import (
"errors"
"log"
"time"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/nomad/client/config"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
// MockDriverConfig is the driver configuration for the MockDriver
type MockDriverConfig struct {
// KillAfter is the duration after which the mock driver indicates the task
// has exited after getting the initial SIGINT signal
KillAfter time.Duration `mapstructure:"kill_after"`
// RunFor is the duration for which the fake task runs for. After this
// period the MockDriver responds to the task running indicating that the
// task has terminated
RunFor time.Duration `mapstructure:"run_for"`
// ExitCode is the exit code with which the MockDriver indicates the task
// has exited
ExitCode int `mapstructure:"exit_code"`
// ExitSignal is the signal with which the MockDriver indicates the task has
// been killed
ExitSignal int `mapstructure:"exit_signal"`
// ExitErrMsg is the error message that the task returns while exiting
ExitErrMsg string `mapstructure:"exit_err_msg"`
}
// MockDriver is a driver which is used for testing purposes
type MockDriver struct {
DriverContext
fingerprint.StaticFingerprinter
}
// NewMockDriver is a factory method which returns a new Mock Driver
func NewMockDriver(ctx *DriverContext) Driver {
return &MockDriver{DriverContext: *ctx}
}
// Start starts the mock driver
func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig MockDriverConfig
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &driverConfig,
})
if err != nil {
return nil, err
}
if err := dec.Decode(task.Config); err != nil {
return nil, err
}
h := mockDriverHandle{
taskName: task.Name,
runFor: driverConfig.RunFor,
killAfter: driverConfig.KillAfter,
killTimeout: task.KillTimeout,
exitCode: driverConfig.ExitCode,
exitSignal: driverConfig.ExitSignal,
logger: m.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
}
if driverConfig.ExitErrMsg != "" {
h.exitErr = errors.New(driverConfig.ExitErrMsg)
}
m.logger.Printf("[DEBUG] driver.mock: starting task %q", task.Name)
go h.run()
return &h, nil
}
// TODO implement Open when we need it.
// Open re-connects the driver to the running task
func (m *MockDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
return nil, nil
}
// TODO implement Open when we need it.
// Validate validates the mock driver configuration
func (m *MockDriver) Validate(map[string]interface{}) error {
return nil
}
// TODO implement Open when we need it.
// Fingerprint fingerprints a node and returns if MockDriver is enabled
func (m *MockDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) {
node.Attributes["driver.mock_driver"] = "1"
return true, nil
}
// MockDriverHandle is a driver handler which supervises a mock task
type mockDriverHandle struct {
taskName string
runFor time.Duration
killAfter time.Duration
killTimeout time.Duration
exitCode int
exitSignal int
exitErr error
logger *log.Logger
waitCh chan *dstructs.WaitResult
doneCh chan struct{}
}
// TODO Implement when we need it.
func (h *mockDriverHandle) ID() string {
return ""
}
// TODO Implement when we need it.
func (h *mockDriverHandle) WaitCh() chan *dstructs.WaitResult {
return h.waitCh
}
// TODO Implement when we need it.
func (h *mockDriverHandle) Update(task *structs.Task) error {
return nil
}
// Kill kills a mock task
func (h *mockDriverHandle) Kill() error {
h.logger.Printf("[DEBUG] driver.mock: killing task %q after kill timeout: %v", h.taskName, h.killTimeout)
select {
case <-h.doneCh:
case <-time.After(h.killAfter):
close(h.doneCh)
case <-time.After(h.killTimeout):
h.logger.Printf("[DEBUG] driver.mock: terminating task %q", h.taskName)
close(h.doneCh)
}
return nil
}
// TODO Implement when we need it.
func (h *mockDriverHandle) Stats() (*cstructs.TaskResourceUsage, error) {
return nil, nil
}
// run waits for the configured amount of time and then indicates the task has
// terminated
func (h *mockDriverHandle) run() {
timer := time.NewTimer(h.runFor)
defer timer.Stop()
for {
select {
case <-timer.C:
close(h.doneCh)
case <-h.doneCh:
h.logger.Printf("[DEBUG] driver.mock: finished running task %q", h.taskName)
h.waitCh <- dstructs.NewWaitResult(h.exitCode, h.exitSignal, h.exitErr)
return
}
}
}

View File

@ -507,7 +507,9 @@ func (r *TaskRunner) collectResourceUsageStats(stopCollection <-chan struct{}) {
r.resourceUsageLock.Lock()
r.resourceUsage = ru
r.resourceUsageLock.Unlock()
r.emitStats(ru)
if ru != nil {
r.emitStats(ru)
}
case <-stopCollection:
return
}

View File

@ -2632,6 +2632,16 @@ func (a *Allocation) TerminalStatus() bool {
}
}
// Terminated returns if the allocation is in a terminal state on a client.
func (a *Allocation) Terminated() bool {
if a.ClientStatus == AllocClientStatusFailed ||
a.ClientStatus == AllocClientStatusComplete ||
a.ClientStatus == AllocClientStatusLost {
return true
}
return false
}
// RanSuccessfully returns whether the client has ran the allocation and all
// tasks finished successfully
func (a *Allocation) RanSuccessfully() bool {

View File

@ -1091,3 +1091,43 @@ func TestTaskArtifact_Validate_Checksum(t *testing.T) {
}
}
}
func TestAllocation_Terminated(t *testing.T) {
type desiredState struct {
ClientStatus string
DesiredStatus string
Terminated bool
}
harness := []desiredState{
{
ClientStatus: AllocClientStatusPending,
DesiredStatus: AllocDesiredStatusStop,
Terminated: false,
},
{
ClientStatus: AllocClientStatusRunning,
DesiredStatus: AllocDesiredStatusStop,
Terminated: false,
},
{
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusStop,
Terminated: true,
},
{
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
Terminated: true,
},
}
for _, state := range harness {
alloc := Allocation{}
alloc.DesiredStatus = state.DesiredStatus
alloc.ClientStatus = state.ClientStatus
if alloc.Terminated() != state.Terminated {
t.Fatalf("expected: %v, actual: %v", state.Terminated, alloc.Terminated())
}
}
}