Disk resource monitoring and enforcement
This commit is contained in:
parent
0b9a07b19b
commit
2a17895a83
|
@ -218,6 +218,7 @@ const (
|
|||
TaskNotRestarting = "Not Restarting"
|
||||
TaskDownloadingArtifacts = "Downloading Artifacts"
|
||||
TaskArtifactDownloadFailed = "Failed Artifact Download"
|
||||
TaskDiskExceeded = "Disk Exceeded"
|
||||
)
|
||||
|
||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||
|
|
|
@ -25,6 +25,10 @@ const (
|
|||
// update will transfer all past state information. If not other transition
|
||||
// has occurred up to this limit, we will send to the server.
|
||||
taskReceivedSyncLimit = 30 * time.Second
|
||||
|
||||
// watchdogInterval is the interval at which resource constraints for the
|
||||
// allocation are being checked and enforced.
|
||||
watchdogInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// AllocStateUpdater is used to update the status of an allocation
|
||||
|
@ -237,6 +241,12 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
|
|||
if r.allocClientStatus != "" || r.allocClientDescription != "" {
|
||||
alloc.ClientStatus = r.allocClientStatus
|
||||
alloc.ClientDescription = r.allocClientDescription
|
||||
|
||||
// Copy over the task states so we don't lose them
|
||||
r.taskStatusLock.RLock()
|
||||
alloc.TaskStates = copyTaskStates(r.taskStates)
|
||||
r.taskStatusLock.RUnlock()
|
||||
|
||||
r.allocLock.Unlock()
|
||||
return alloc
|
||||
}
|
||||
|
@ -328,7 +338,7 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
|
|||
for task, tr := range r.tasks {
|
||||
if task != taskName {
|
||||
destroyingTasks = append(destroyingTasks, task)
|
||||
tr.Destroy()
|
||||
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
|
||||
}
|
||||
}
|
||||
if len(destroyingTasks) > 0 {
|
||||
|
@ -376,7 +386,7 @@ func (r *AllocRunner) Run() {
|
|||
// Create the execution context
|
||||
r.ctxLock.Lock()
|
||||
if r.ctx == nil {
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID), r.Alloc().Resources.DiskMB)
|
||||
if err := allocDir.Build(tg.Tasks); err != nil {
|
||||
r.logger.Printf("[WARN] client: failed to build task directories: %v", err)
|
||||
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
|
||||
|
@ -413,6 +423,16 @@ func (r *AllocRunner) Run() {
|
|||
}
|
||||
r.taskLock.Unlock()
|
||||
|
||||
// Start watching the shared allocation directory for disk usage
|
||||
go r.ctx.AllocDir.StartDiskWatcher()
|
||||
|
||||
watchdog := time.NewTicker(watchdogInterval)
|
||||
defer watchdog.Stop()
|
||||
|
||||
// taskDestroyEvent contains an event that caused the destroyment of a task
|
||||
// in the allocation.
|
||||
var taskDestroyEvent *structs.TaskEvent
|
||||
|
||||
OUTER:
|
||||
// Wait for updates
|
||||
for {
|
||||
|
@ -425,6 +445,7 @@ OUTER:
|
|||
|
||||
// Check if we're in a terminal status
|
||||
if update.TerminalStatus() {
|
||||
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
|
||||
break OUTER
|
||||
}
|
||||
|
||||
|
@ -433,7 +454,14 @@ OUTER:
|
|||
for _, tr := range runners {
|
||||
tr.Update(update)
|
||||
}
|
||||
case <-watchdog.C:
|
||||
if event, desc := r.checkResources(); event != nil {
|
||||
r.setStatus(structs.AllocClientStatusFailed, desc)
|
||||
taskDestroyEvent = event
|
||||
break OUTER
|
||||
}
|
||||
case <-r.destroyCh:
|
||||
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
|
||||
break OUTER
|
||||
}
|
||||
}
|
||||
|
@ -441,7 +469,7 @@ OUTER:
|
|||
// Destroy each sub-task
|
||||
runners := r.getTaskRunners()
|
||||
for _, tr := range runners {
|
||||
tr.Destroy()
|
||||
tr.Destroy(taskDestroyEvent)
|
||||
}
|
||||
|
||||
// Wait for termination of the task runners
|
||||
|
@ -452,11 +480,26 @@ OUTER:
|
|||
// Final state sync
|
||||
r.syncStatus()
|
||||
|
||||
// Stop watching the shared allocation directory
|
||||
r.ctx.AllocDir.StopDiskWatcher()
|
||||
|
||||
// Block until we should destroy the state of the alloc
|
||||
r.handleDestroy()
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
|
||||
}
|
||||
|
||||
// checkResources monitors and enforces alloc resource usage. It returns an
|
||||
// appropriate task event describing why the allocation had to be killed.
|
||||
func (r *AllocRunner) checkResources() (*structs.TaskEvent, string) {
|
||||
diskSize := r.ctx.AllocDir.GetSize()
|
||||
diskLimit := r.Alloc().Resources.DiskInBytes()
|
||||
if diskSize > diskLimit {
|
||||
return structs.NewTaskEvent(structs.TaskDiskExceeded).SetDiskLimit(diskLimit).SetDiskSize(diskSize),
|
||||
"shared allocation directory exceeded the allowed disk space"
|
||||
}
|
||||
return nil, ""
|
||||
}
|
||||
|
||||
// handleDestroy blocks till the AllocRunner should be destroyed and does the
|
||||
// necessary cleanup.
|
||||
func (r *AllocRunner) handleDestroy() {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
@ -149,6 +150,112 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestAllocRunner_DiskExceeded_Destroy(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
upd, ar := testAllocRunner(false)
|
||||
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = []string{"60"}
|
||||
go ar.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusRunning {
|
||||
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Create a 20mb file in the shared alloc directory, which should cause the
|
||||
// allocation to terminate in a failed state.
|
||||
name := ar.ctx.AllocDir.SharedDir + "/20mb.bin"
|
||||
f, err := os.Create(name)
|
||||
if err != nil {
|
||||
t.Fatal("unable to create file: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatal("unable to close file: %v", err)
|
||||
}
|
||||
os.Remove(name)
|
||||
}()
|
||||
|
||||
// write 20 megabytes (1280 * 16384 bytes) of zeros to the file
|
||||
w := bufio.NewWriter(f)
|
||||
buf := make([]byte, 16384)
|
||||
for i := 0; i < 1280; i++ {
|
||||
if _, err := w.Write(buf); err != nil {
|
||||
t.Fatal("unable to write to file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check the status has changed.
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusFailed {
|
||||
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
|
||||
}
|
||||
|
||||
// Check the state still exists
|
||||
if _, err := os.Stat(ar.stateFilePath()); err != nil {
|
||||
return false, fmt.Errorf("state file destroyed: %v", err)
|
||||
}
|
||||
|
||||
// Check the alloc directory still exists
|
||||
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
|
||||
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Send the destroy signal and ensure the AllocRunner cleans up.
|
||||
ar.Destroy()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check the status has changed.
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusFailed {
|
||||
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
|
||||
}
|
||||
|
||||
// Check the state was cleaned
|
||||
if _, err := os.Stat(ar.stateFilePath()); err == nil {
|
||||
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
// Check the alloc directory was cleaned
|
||||
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
|
||||
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
func TestAllocRunner_Destroy(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
upd, ar := testAllocRunner(false)
|
||||
|
|
|
@ -4,8 +4,11 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/tomb.v1"
|
||||
|
@ -15,6 +18,20 @@ import (
|
|||
"github.com/hpcloud/tail/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
// The minimum frequency to use for disk monitoring.
|
||||
minCheckDiskInterval = 3 * time.Minute
|
||||
|
||||
// The maximum frequency to use for disk monitoring.
|
||||
maxCheckDiskInterval = 15 * time.Second
|
||||
|
||||
// The amount of time that maxCheckDiskInterval is always used after
|
||||
// starting the allocation. This prevents unbounded disk usage that would
|
||||
// otherwise be possible for a number of minutes if we started with the
|
||||
// minCheckDiskInterval.
|
||||
checkDiskMaxEnforcePeriod = 5 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
// The name of the directory that is shared across tasks in a task group.
|
||||
SharedAllocName = "alloc"
|
||||
|
@ -44,6 +61,33 @@ type AllocDir struct {
|
|||
|
||||
// TaskDirs is a mapping of task names to their non-shared directory.
|
||||
TaskDirs map[string]string
|
||||
|
||||
// Size is the total consumed disk size of the shared directory in bytes
|
||||
size int64
|
||||
sizeLock sync.RWMutex
|
||||
|
||||
// The minimum frequency to use for disk monitoring.
|
||||
MinCheckDiskInterval time.Duration
|
||||
|
||||
// The maximum frequency to use for disk monitoring.
|
||||
MaxCheckDiskInterval time.Duration
|
||||
|
||||
// The amount of time that maxCheckDiskInterval is always used after
|
||||
// starting the allocation. This prevents unbounded disk usage that would
|
||||
// otherwise be possible for a number of minutes if we started with the
|
||||
// minCheckDiskInterval.
|
||||
CheckDiskMaxEnforcePeriod time.Duration
|
||||
|
||||
// running reflects the state of the disk watcher process.
|
||||
running bool
|
||||
|
||||
// watchCh signals that the alloc directory is being torn down and that
|
||||
// any monitoring on it should stop.
|
||||
watchCh chan struct{}
|
||||
|
||||
// maxSize represents the total amount of megabytes that the shared allocation
|
||||
// directory is allowed to consume.
|
||||
maxSize int
|
||||
}
|
||||
|
||||
// AllocFileInfo holds information about a file inside the AllocDir
|
||||
|
@ -64,14 +108,24 @@ type AllocDirFS interface {
|
|||
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
|
||||
}
|
||||
|
||||
func NewAllocDir(allocDir string) *AllocDir {
|
||||
d := &AllocDir{AllocDir: allocDir, TaskDirs: make(map[string]string)}
|
||||
// NewAllocDir initializes the AllocDir struct with allocDir as base path for
|
||||
// the allocation directory and maxSize as the maximum allowed size in megabytes.
|
||||
func NewAllocDir(allocDir string, maxSize int) *AllocDir {
|
||||
d := &AllocDir{
|
||||
AllocDir: allocDir,
|
||||
MaxCheckDiskInterval: maxCheckDiskInterval,
|
||||
MinCheckDiskInterval: minCheckDiskInterval,
|
||||
CheckDiskMaxEnforcePeriod: checkDiskMaxEnforcePeriod,
|
||||
TaskDirs: make(map[string]string),
|
||||
maxSize: maxSize,
|
||||
}
|
||||
d.SharedDir = filepath.Join(d.AllocDir, SharedAllocName)
|
||||
return d
|
||||
}
|
||||
|
||||
// Tears down previously build directory structure.
|
||||
func (d *AllocDir) Destroy() error {
|
||||
|
||||
// Unmount all mounted shared alloc dirs.
|
||||
var mErr multierror.Error
|
||||
if err := d.UnmountAll(); err != nil {
|
||||
|
@ -397,3 +451,82 @@ func (d *AllocDir) pathExists(path string) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetSize returns the size of the shared allocation directory.
|
||||
func (d *AllocDir) GetSize() int64 {
|
||||
d.sizeLock.Lock()
|
||||
defer d.sizeLock.Unlock()
|
||||
|
||||
return d.size
|
||||
}
|
||||
|
||||
// setSize sets the size of the shared allocation directory.
|
||||
func (d *AllocDir) setSize(size int64) {
|
||||
d.sizeLock.Lock()
|
||||
defer d.sizeLock.Unlock()
|
||||
|
||||
d.size = size
|
||||
}
|
||||
|
||||
// StartDiskWatcher periodically checks the disk space consumed by the shared
|
||||
// allocation directory.
|
||||
func (d *AllocDir) StartDiskWatcher() {
|
||||
start := time.Now()
|
||||
|
||||
sync := time.NewTimer(d.MaxCheckDiskInterval)
|
||||
defer sync.Stop()
|
||||
|
||||
d.running = true
|
||||
d.watchCh = make(chan struct{})
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-d.watchCh:
|
||||
return
|
||||
case <-sync.C:
|
||||
if err := d.syncDiskUsage(); err != nil {
|
||||
log.Printf("[WARN] client: failed to sync disk usage: %v", err)
|
||||
}
|
||||
// Calculate the disk ratio.
|
||||
diskRatio := float64(d.size) / float64(d.maxSize*structs.BytesInMegabyte)
|
||||
|
||||
// Exponentially decrease the interval when the disk ratio increases.
|
||||
nextInterval := time.Duration(int64(1.0/(0.1*math.Pow(diskRatio, 2))+5)) * time.Second
|
||||
|
||||
// Use the maximum interval for the first five minutes or if the
|
||||
// disk ratio is sufficiently high. Also use the minimum check interval
|
||||
// if the disk ratio becomes low enough.
|
||||
if nextInterval < d.MaxCheckDiskInterval || time.Since(start) < d.CheckDiskMaxEnforcePeriod {
|
||||
nextInterval = d.MaxCheckDiskInterval
|
||||
} else if nextInterval > d.MinCheckDiskInterval {
|
||||
nextInterval = d.MinCheckDiskInterval
|
||||
}
|
||||
sync.Reset(nextInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StopDiskWatcher closes the watch channel which causes the disk monitoring to stop.
|
||||
func (d *AllocDir) StopDiskWatcher() {
|
||||
if d.running {
|
||||
d.running = false
|
||||
close(d.watchCh)
|
||||
}
|
||||
}
|
||||
|
||||
// syncDiskUsage walks the shared allocation directory recursively and
|
||||
// calculates the total consumed disk space.
|
||||
func (d *AllocDir) syncDiskUsage() error {
|
||||
var size int64
|
||||
err := filepath.Walk(d.SharedDir,
|
||||
func(path string, info os.FileInfo, err error) error {
|
||||
// Ignore paths that do not have a valid FileInfo object
|
||||
if err == nil {
|
||||
size += info.Size()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// Store the disk consumption
|
||||
d.setSize(size)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ func TestAllocDir_BuildAlloc(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
d := NewAllocDir(tmp)
|
||||
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
defer d.Destroy()
|
||||
tasks := []*structs.Task{t1, t2}
|
||||
if err := d.Build(tasks); err != nil {
|
||||
|
@ -83,7 +83,7 @@ func TestAllocDir_LogDir(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
d := NewAllocDir(tmp)
|
||||
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
defer d.Destroy()
|
||||
|
||||
expected := filepath.Join(d.AllocDir, SharedAllocName, LogDirName)
|
||||
|
@ -99,7 +99,7 @@ func TestAllocDir_EmbedNonExistent(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
d := NewAllocDir(tmp)
|
||||
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
defer d.Destroy()
|
||||
tasks := []*structs.Task{t1, t2}
|
||||
if err := d.Build(tasks); err != nil {
|
||||
|
@ -121,7 +121,7 @@ func TestAllocDir_EmbedDirs(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
d := NewAllocDir(tmp)
|
||||
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
defer d.Destroy()
|
||||
tasks := []*structs.Task{t1, t2}
|
||||
if err := d.Build(tasks); err != nil {
|
||||
|
@ -182,7 +182,7 @@ func TestAllocDir_MountSharedAlloc(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
d := NewAllocDir(tmp)
|
||||
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
defer d.Destroy()
|
||||
tasks := []*structs.Task{t1, t2}
|
||||
if err := d.Build(tasks); err != nil {
|
||||
|
|
|
@ -78,7 +78,7 @@ func testConfig() *config.Config {
|
|||
|
||||
func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
|
||||
cfg := testConfig()
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()))
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()), task.Resources.DiskMB)
|
||||
allocDir.Build([]*structs.Task{task})
|
||||
alloc := mock.Alloc()
|
||||
execCtx := NewExecContext(allocDir, alloc.ID)
|
||||
|
|
|
@ -11,14 +11,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/go-ps"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
"github.com/hashicorp/nomad/client/testutil"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
tu "github.com/hashicorp/nomad/testutil"
|
||||
"github.com/mitchellh/go-ps"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -38,7 +37,7 @@ func mockAllocDir(t *testing.T) (*structs.Task, *allocdir.AllocDir) {
|
|||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID))
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID), task.Resources.DiskMB)
|
||||
if err := allocDir.Build([]*structs.Task{task}); err != nil {
|
||||
log.Panicf("allocDir.Build() failed: %v", err)
|
||||
}
|
||||
|
|
|
@ -65,10 +65,11 @@ type TaskRunner struct {
|
|||
// downloaded
|
||||
artifactsDownloaded bool
|
||||
|
||||
destroy bool
|
||||
destroyCh chan struct{}
|
||||
destroyLock sync.Mutex
|
||||
waitCh chan struct{}
|
||||
destroy bool
|
||||
destroyCh chan struct{}
|
||||
destroyLock sync.Mutex
|
||||
destroyEvent *structs.TaskEvent
|
||||
waitCh chan struct{}
|
||||
}
|
||||
|
||||
// taskRunnerState is used to snapshot the state of the task runner
|
||||
|
@ -298,7 +299,7 @@ func (r *TaskRunner) validateTask() error {
|
|||
}
|
||||
|
||||
func (r *TaskRunner) run() {
|
||||
// Predeclare things so we an jump to the RESTART
|
||||
// Predeclare things so we can jump to the RESTART
|
||||
var handleEmpty bool
|
||||
var stopCollection chan struct{}
|
||||
|
||||
|
@ -403,6 +404,9 @@ func (r *TaskRunner) run() {
|
|||
// Store that the task has been destroyed and any associated error.
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
|
||||
|
||||
// Store that task event that provides context on the task destroy.
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
|
||||
r.runningLock.Lock()
|
||||
r.running = false
|
||||
r.runningLock.Unlock()
|
||||
|
@ -446,8 +450,8 @@ func (r *TaskRunner) run() {
|
|||
destroyed := r.destroy
|
||||
r.destroyLock.Unlock()
|
||||
if destroyed {
|
||||
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled))
|
||||
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed due to: %s", r.task.Name, r.destroyEvent.Message)
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -459,7 +463,7 @@ func (r *TaskRunner) run() {
|
|||
}
|
||||
}
|
||||
|
||||
// startTask creates the driver and start the task.
|
||||
// startTask creates the driver and starts the task.
|
||||
func (r *TaskRunner) startTask() error {
|
||||
// Create a driver
|
||||
driver, err := r.createDriver()
|
||||
|
@ -616,8 +620,9 @@ func (r *TaskRunner) Update(update *structs.Allocation) {
|
|||
}
|
||||
}
|
||||
|
||||
// Destroy is used to indicate that the task context should be destroyed
|
||||
func (r *TaskRunner) Destroy() {
|
||||
// Destroy is used to indicate that the task context should be destroyed. The
|
||||
// event parameter provides a context for the destroy.
|
||||
func (r *TaskRunner) Destroy(event *structs.TaskEvent) {
|
||||
r.destroyLock.Lock()
|
||||
defer r.destroyLock.Unlock()
|
||||
|
||||
|
@ -625,6 +630,7 @@ func (r *TaskRunner) Destroy() {
|
|||
return
|
||||
}
|
||||
r.destroy = true
|
||||
r.destroyEvent = event
|
||||
close(r.destroyCh)
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas
|
|||
// we have a mock so that doesn't happen.
|
||||
task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}}
|
||||
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID))
|
||||
allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID), task.Resources.DiskMB)
|
||||
allocDir.Build([]*structs.Task{task})
|
||||
|
||||
ctx := driver.NewExecContext(allocDir, alloc.ID)
|
||||
|
@ -71,7 +71,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
|
|||
upd, tr := testTaskRunner(false)
|
||||
tr.MarkReceived()
|
||||
go tr.Run()
|
||||
defer tr.Destroy()
|
||||
defer tr.Destroy(structs.TaskKilled)
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
select {
|
||||
|
@ -138,7 +138,7 @@ func TestTaskRunner_Destroy(t *testing.T) {
|
|||
}
|
||||
|
||||
// Begin the tear down
|
||||
tr.Destroy()
|
||||
tr.Destroy(structs.TaskKilled)
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
|
@ -171,7 +171,7 @@ func TestTaskRunner_Update(t *testing.T) {
|
|||
tr.task.Config["command"] = "/bin/sleep"
|
||||
tr.task.Config["args"] = []string{"100"}
|
||||
go tr.Run()
|
||||
defer tr.Destroy()
|
||||
defer tr.Destroy(structs.TaskKilled)
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
// Update the task definition
|
||||
|
@ -225,7 +225,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
|||
tr.task.Config["command"] = "/bin/sleep"
|
||||
tr.task.Config["args"] = []string{"10"}
|
||||
go tr.Run()
|
||||
defer tr.Destroy()
|
||||
defer tr.Destroy(structs.TaskKilled)
|
||||
|
||||
// Snapshot state
|
||||
time.Sleep(2 * time.Second)
|
||||
|
@ -240,7 +240,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
go tr2.Run()
|
||||
defer tr2.Destroy()
|
||||
defer tr2.Destroy(structs.TaskKilled)
|
||||
|
||||
// Destroy and wait
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
|
@ -272,7 +272,7 @@ func TestTaskRunner_Download_List(t *testing.T) {
|
|||
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
go tr.Run()
|
||||
defer tr.Destroy()
|
||||
defer tr.Destroy(structs.TaskKilled)
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
select {
|
||||
|
@ -337,7 +337,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
|
|||
upd, tr := testTaskRunnerFromAlloc(true, alloc)
|
||||
tr.MarkReceived()
|
||||
go tr.Run()
|
||||
defer tr.Destroy()
|
||||
defer tr.Destroy(structs.TaskKilled)
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
select {
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
@ -448,7 +449,7 @@ func tempAllocDir(t *testing.T) *allocdir.AllocDir {
|
|||
t.Fatalf("failed to chmod dir: %v", err)
|
||||
}
|
||||
|
||||
return allocdir.NewAllocDir(dir)
|
||||
return allocdir.NewAllocDir(dir, structs.DefaultResources().DiskMB)
|
||||
}
|
||||
|
||||
type nopWriteCloser struct {
|
||||
|
|
|
@ -721,6 +721,10 @@ type Resources struct {
|
|||
Networks []*NetworkResource
|
||||
}
|
||||
|
||||
const (
|
||||
BytesInMegabyte = 1024 * 1024
|
||||
)
|
||||
|
||||
// DefaultResources returns the default resources for a task.
|
||||
func DefaultResources() *Resources {
|
||||
return &Resources{
|
||||
|
@ -731,6 +735,11 @@ func DefaultResources() *Resources {
|
|||
}
|
||||
}
|
||||
|
||||
// DiskInBytes returns the amount of disk resources in bytes.
|
||||
func (r *Resources) DiskInBytes() int64 {
|
||||
return int64(r.DiskMB * BytesInMegabyte)
|
||||
}
|
||||
|
||||
// Merge merges this resource with another resource.
|
||||
func (r *Resources) Merge(other *Resources) {
|
||||
if other.CPU != 0 {
|
||||
|
@ -2131,7 +2140,7 @@ func (ts *TaskState) Copy() *TaskState {
|
|||
return copy
|
||||
}
|
||||
|
||||
// Failed returns if the task has has failed.
|
||||
// Failed returns true if the task has has failed.
|
||||
func (ts *TaskState) Failed() bool {
|
||||
l := len(ts.Events)
|
||||
if ts.State != TaskStateDead || l == 0 {
|
||||
|
@ -2139,7 +2148,7 @@ func (ts *TaskState) Failed() bool {
|
|||
}
|
||||
|
||||
switch ts.Events[l-1].Type {
|
||||
case TaskNotRestarting, TaskArtifactDownloadFailed, TaskFailedValidation:
|
||||
case TaskDiskExceeded, TaskNotRestarting, TaskArtifactDownloadFailed, TaskFailedValidation:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
@ -2201,6 +2210,14 @@ const (
|
|||
// TaskArtifactDownloadFailed indicates that downloading the artifacts
|
||||
// failed.
|
||||
TaskArtifactDownloadFailed = "Failed Artifact Download"
|
||||
|
||||
// TaskDiskExceeded indicates that one of the tasks in a taskgroup has
|
||||
// exceeded the requested disk resources.
|
||||
TaskDiskExceeded = "Disk Resources Exceeded"
|
||||
|
||||
// TaskSiblingFailed indicates that a sibling task in the task group has
|
||||
// failed.
|
||||
TaskSiblingFailed = "Sibling task failed"
|
||||
)
|
||||
|
||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||
|
@ -2234,6 +2251,16 @@ type TaskEvent struct {
|
|||
|
||||
// Validation fields
|
||||
ValidationError string // Validation error
|
||||
|
||||
// The maximum allowed task disk size.
|
||||
DiskLimit int64
|
||||
|
||||
// The recorded task disk size.
|
||||
DiskSize int64
|
||||
|
||||
// Name of the sibling task that caused termination of the task that
|
||||
// the TaskEvent refers to.
|
||||
FailedSibling string
|
||||
}
|
||||
|
||||
func (te *TaskEvent) GoString() string {
|
||||
|
@ -2316,6 +2343,21 @@ func (e *TaskEvent) SetKillTimeout(timeout time.Duration) *TaskEvent {
|
|||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetDiskLimit(limit int64) *TaskEvent {
|
||||
e.DiskLimit = limit
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetDiskSize(size int64) *TaskEvent {
|
||||
e.DiskSize = size
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetFailedSibling(sibling string) *TaskEvent {
|
||||
e.FailedSibling = sibling
|
||||
return e
|
||||
}
|
||||
|
||||
// TaskArtifact is an artifact to download before running the task.
|
||||
type TaskArtifact struct {
|
||||
// GetterSource is the source to download an artifact using go-getter
|
||||
|
|
Loading…
Reference in New Issue