Merge branch 'master' into f-logrotator

This commit is contained in:
Diptanu Choudhury 2016-02-09 12:25:44 -08:00
commit 9d6eed1fb4
31 changed files with 449 additions and 262 deletions

View File

@ -2,7 +2,7 @@
BACKWARDS INCOMPATIBILITIES:
* core: Improved restart policy with more user configuration [GH-594]
* core/cli: Print short identifiers [GH-675]
* core/cli: Print short identifiers [GH-760]
* core/consul: Validate service name doesn't include period [GH-770]
* core/jobspec: Variables/constraints interpreted using ${} notation [GH-675]
* client: Environment variable containing address for each allocated port

View File

@ -60,6 +60,7 @@ type Allocation struct {
TaskStates map[string]*TaskState
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
}
// AllocationMetric is used to deserialize allocation metrics.
@ -93,6 +94,7 @@ type AllocationListStub struct {
TaskStates map[string]*TaskState
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
}
// AllocIndexSort reverse sorts allocs by CreateIndex.

View File

@ -53,7 +53,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusDead {
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}
return true, nil
@ -77,7 +77,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil

View File

@ -8,6 +8,7 @@ import (
"path/filepath"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -37,9 +38,6 @@ type AllocDir struct {
// TaskDirs is a mapping of task names to their non-shared directory.
TaskDirs map[string]string
// A list of locations the shared alloc has been mounted to.
Mounted []string
}
// AllocFileInfo holds information about a file inside the AllocDir
@ -67,13 +65,39 @@ func NewAllocDir(allocDir string) *AllocDir {
// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
// Unmount all mounted shared alloc dirs.
for _, m := range d.Mounted {
if err := d.unmountSharedDir(m); err != nil {
return fmt.Errorf("Failed to unmount shared directory: %v", err)
}
var mErr multierror.Error
if err := d.UnmountAll(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return os.RemoveAll(d.AllocDir)
if err := os.RemoveAll(d.AllocDir); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
}
func (d *AllocDir) UnmountAll() error {
var mErr multierror.Error
for _, dir := range d.TaskDirs {
// Check if the directory has the shared alloc mounted.
taskAlloc := filepath.Join(dir, SharedAllocName)
if d.pathExists(taskAlloc) {
if err := d.unmountSharedDir(taskAlloc); err != nil {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("failed to unmount shared alloc dir %q: %v", taskAlloc, err))
}
if err := os.RemoveAll(taskAlloc); err != nil {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("failed to delete shared alloc dir %q: %v", taskAlloc, err))
}
}
// Unmount dev/ and proc/ have been mounted.
d.unmountSpecialDirs(dir)
}
return mErr.ErrorOrNil()
}
// Given a list of a task build the correct alloc structure.
@ -248,7 +272,6 @@ func (d *AllocDir) MountSharedDir(task string) error {
return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err)
}
d.Mounted = append(d.Mounted, taskLoc)
return nil
}
@ -325,3 +348,13 @@ func fileCopy(src, dst string, perm os.FileMode) error {
return nil
}
// pathExists is a helper function to check if the path exists.
func (d *AllocDir) pathExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}

View File

@ -13,3 +13,14 @@ func (d *AllocDir) mountSharedDir(dir string) error {
func (d *AllocDir) unmountSharedDir(dir string) error {
return syscall.Unlink(dir)
}
// MountSpecialDirs mounts the dev and proc file system on the chroot of the
// task. It's a no-op on darwin.
func (d *AllocDir) MountSpecialDirs(taskDir string) error {
return nil
}
// unmountSpecialDirs unmounts the dev and proc file system from the chroot
func (d *AllocDir) unmountSpecialDirs(taskDir string) error {
return nil
}

View File

@ -1,8 +1,12 @@
package allocdir
import (
"fmt"
"os"
"path/filepath"
"syscall"
"github.com/hashicorp/go-multierror"
)
// Bind mounts the shared directory into the task directory. Must be root to
@ -18,3 +22,62 @@ func (d *AllocDir) mountSharedDir(taskDir string) error {
func (d *AllocDir) unmountSharedDir(dir string) error {
return syscall.Unmount(dir, 0)
}
// MountSpecialDirs mounts the dev and proc file system from the host to the
// chroot
func (d *AllocDir) MountSpecialDirs(taskDir string) error {
// Mount dev
dev := filepath.Join(taskDir, "dev")
if !d.pathExists(dev) {
if err := os.Mkdir(dev, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", dev, err)
}
if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err)
}
}
// Mount proc
proc := filepath.Join(taskDir, "proc")
if !d.pathExists(proc) {
if err := os.Mkdir(proc, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", proc, err)
}
if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err)
}
}
return nil
}
// unmountSpecialDirs unmounts the dev and proc file system from the chroot
func (d *AllocDir) unmountSpecialDirs(taskDir string) error {
errs := new(multierror.Error)
dev := filepath.Join(taskDir, "dev")
if d.pathExists(dev) {
if err := syscall.Unmount(dev, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err))
}
if err := os.RemoveAll(dev); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err))
}
}
// Unmount proc.
proc := filepath.Join(taskDir, "proc")
if d.pathExists(proc) {
if err := syscall.Unmount(proc, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err))
}
if err := os.RemoveAll(proc); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err))
}
}
return errs.ErrorOrNil()
}

View File

@ -23,3 +23,14 @@ func (d *AllocDir) dropDirPermissions(path string) error {
func (d *AllocDir) unmountSharedDir(dir string) error {
return nil
}
// MountSpecialDirs mounts the dev and proc file system on the chroot of the
// task. It's a no-op on windows.
func (d *AllocDir) MountSpecialDirs(taskDir string) error {
return nil
}
// unmountSpecialDirs unmounts the dev and proc file system from the chroot
func (d *AllocDir) unmountSpecialDirs(taskDir string) error {
return nil
}

View File

@ -57,11 +57,13 @@ type Config struct {
// Node provides the base node
Node *structs.Node
// ExecutorMaxPort defines the highest port a plugin process can use
ExecutorMaxPort int
// ClientMaxPort is the upper range of the ports that the client uses for
// communicating with plugin subsystems
ClientMaxPort uint
// ExecutorMinPort defines the lowest port a plugin process can use
ExecutorMinPort int
// ClientMinPort is the lower range of the ports that the client uses for
// communicating with plugin subsystems
ClientMinPort uint
// Options provides arbitrary key-value configuration for nomad internals,
// like fingerprinters and drivers. The format is:

View File

@ -89,8 +89,14 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
max := d.config.MaxKillTimeout.Nanoseconds()
desired := task.KillTimeout.Nanoseconds()
// Make the minimum time between signal and kill, 1 second.
if desired == 0 {
desired = (1 * time.Second).Nanoseconds()
}
if desired < max {
return task.KillTimeout
return time.Duration(desired)
}
return d.config.MaxKillTimeout

View File

@ -46,6 +46,7 @@ func testConfig() *config.Config {
conf := &config.Config{}
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
conf.MaxKillTimeout = 10 * time.Second
return conf
}

View File

@ -9,7 +9,9 @@ import (
"syscall"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
@ -17,8 +19,6 @@ import (
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
// ExecDriver fork/execs tasks using as many of the underlying OS's isolation
@ -36,14 +36,15 @@ type ExecDriverConfig struct {
// execHandle is returned from Start/Open as a handle to the PID
type execHandle struct {
pluginClient *plugin.Client
executor executor.Executor
groups *cgroupConfig.Cgroup
userPid int
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
pluginClient *plugin.Client
executor executor.Executor
isolationConfig *executor.IsolationConfig
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}
// NewExecDriver is used to create a new exec driver
@ -110,7 +111,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
@ -133,24 +134,27 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Return a driver handle
h := &execHandle{
pluginClient: pluginClient,
userPid: ps.Pid,
executor: exec,
groups: &ps.IsolationConfig,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
userPid: ps.Pid,
executor: exec,
allocDir: ctx.AllocDir,
isolationConfig: ps.IsolationConfig,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
}
type execId struct {
KillTimeout time.Duration
UserPid int
Groups *cgroupConfig.Cgroup
PluginConfig *ExecutorReattachConfig
KillTimeout time.Duration
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
IsolationConfig *executor.IsolationConfig
PluginConfig *ExecutorReattachConfig
}
func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@ -162,28 +166,36 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
pluginConfig := &plugin.ClientConfig{
Reattach: id.PluginConfig.PluginConfig(),
}
executor, client, err := createExecutor(pluginConfig, d.config.LogOutput)
exec, client, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
merrs := new(multierror.Error)
merrs.Errors = append(merrs.Errors, err)
d.logger.Println("[ERROR] driver.exec: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
d.logger.Printf("[ERROR] driver.exec: error destroying plugin and userpid: %v", e)
merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e))
}
if e := destroyCgroup(id.Groups); e != nil {
d.logger.Printf("[ERROR] driver.exec: %v", e)
if id.IsolationConfig != nil {
if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e))
}
}
return nil, fmt.Errorf("error connecting to plugin: %v", err)
if e := ctx.AllocDir.UnmountAll(); e != nil {
merrs.Errors = append(merrs.Errors, e)
}
return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil())
}
// Return a driver handle
h := &execHandle{
pluginClient: client,
executor: executor,
userPid: id.UserPid,
groups: id.Groups,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: client,
executor: exec,
userPid: id.UserPid,
allocDir: id.AllocDir,
isolationConfig: id.IsolationConfig,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
@ -191,10 +203,11 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
func (h *execHandle) ID() string {
id := execId{
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
Groups: h.groups,
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
IsolationConfig: h.isolationConfig,
}
data, err := json.Marshal(id)
@ -217,7 +230,10 @@ func (h *execHandle) Update(task *structs.Task) error {
}
func (h *execHandle) Kill() error {
h.executor.ShutDown()
if err := h.executor.ShutDown(); err != nil {
return fmt.Errorf("executor Shutdown failed: %v", err)
}
select {
case <-h.doneCh:
return nil
@ -225,15 +241,35 @@ func (h *execHandle) Kill() error {
if h.pluginClient.Exited() {
return nil
}
err := h.executor.Exit()
return err
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}
return nil
}
}
func (h *execHandle) run() {
ps, err := h.executor.Wait()
close(h.doneCh)
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err}
// If the exitcode is 0 and we had an error that means the plugin didn't
// connect and doesn't know the state of the user process so we are killing
// the user process so that when we create a new executor on restarting the
// new user process doesn't have collisions with resources that the older
// user pid might be holding onto.
if ps.ExitCode == 0 && err != nil {
if h.isolationConfig != nil {
if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil {
h.logger.Printf("[ERROR] driver.exec: destroying cgroup failed while killing cgroup: %v", e)
}
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0,
Err: err}
close(h.waitCh)
h.pluginClient.Kill()
}

View File

@ -129,7 +129,8 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) {
userProc, err := os.FindProcess(id.UserPid)
err = userProc.Signal(syscall.Signal(0))
if err != nil {
if err == nil {
t.Fatalf("expected user process to die")
}
}
@ -321,9 +322,10 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) {
Name: "sleep",
Config: map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"10"},
"args": []string{"100"},
},
Resources: basicResources,
Resources: basicResources,
KillTimeout: 10 * time.Second,
}
driverCtx, execCtx := testDriverContexts(task)

View File

@ -61,12 +61,18 @@ type ExecCommand struct {
Args []string
}
// IsolationConfig has information about the isolation mechanism the executor
// uses to put resource constraints and isolation on the user process
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}
// ProcessState holds information about the state of a user process.
type ProcessState struct {
Pid int
ExitCode int
Signal int
IsolationConfig cgroupConfig.Cgroup
IsolationConfig *IsolationConfig
Time time.Time
}
@ -168,9 +174,9 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
if err := e.cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting command: %v", err)
}
go e.wait()
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: *e.groups, Time: time.Now()}, nil
ic := &IsolationConfig{Cgroup: e.groups}
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil
}
// Wait waits until a process has exited and returns it's exitcode and errors
@ -212,11 +218,19 @@ func (e *UniversalExecutor) wait() {
e.removeChrootMounts()
}
if e.ctx.ResourceLimits {
e.destroyCgroup()
e.lock.Lock()
DestroyCgroup(e.groups)
e.lock.Unlock()
}
e.exitState = &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()}
}
var (
// finishedErr is the error message received when trying to kill and already
// exited process.
finishedErr = "os: process already finished"
)
// Exit cleans up the alloc directory, destroys cgroups and kills the user
// process
func (e *UniversalExecutor) Exit() error {
@ -224,10 +238,11 @@ func (e *UniversalExecutor) Exit() error {
if e.cmd.Process != nil {
proc, err := os.FindProcess(e.cmd.Process.Pid)
if err != nil {
e.logger.Printf("[ERROR] can't find process with pid: %v, err: %v", e.cmd.Process.Pid, err)
}
if err := proc.Kill(); err != nil {
e.logger.Printf("[ERROR] can't kill process with pid: %v, err: %v", e.cmd.Process.Pid, err)
e.logger.Printf("[ERROR] executor: can't find process with pid: %v, err: %v",
e.cmd.Process.Pid, err)
} else if err := proc.Kill(); err != nil && err.Error() != finishedErr {
merr.Errors = append(merr.Errors,
fmt.Errorf("can't kill process with pid: %v, err: %v", e.cmd.Process.Pid, err))
}
}
@ -237,9 +252,11 @@ func (e *UniversalExecutor) Exit() error {
}
}
if e.ctx.ResourceLimits {
if err := e.destroyCgroup(); err != nil {
e.lock.Lock()
if err := DestroyCgroup(e.groups); err != nil {
merr.Errors = append(merr.Errors, err)
}
e.lock.Unlock()
}
return merr.ErrorOrNil()
}
@ -262,11 +279,12 @@ func (e *UniversalExecutor) ShutDown() error {
return nil
}
// configureTaskDir sets the task dir in the executor
func (e *UniversalExecutor) configureTaskDir() error {
taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName]
e.taskDir = taskDir
if !ok {
return fmt.Errorf("Couldn't find task directory for task %v", e.ctx.TaskName)
return fmt.Errorf("couldn't find task directory for task %v", e.ctx.TaskName)
}
e.cmd.Dir = taskDir
return nil

View File

@ -2,11 +2,15 @@
package executor
import (
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
func (e *UniversalExecutor) configureChroot() error {
return nil
}
func (e *UniversalExecutor) destroyCgroup() error {
func DestroyCgroup(groups *cgroupConfig.Cgroup) error {
return nil
}

View File

@ -46,11 +46,11 @@ func (e *UniversalExecutor) configureIsolation() error {
return fmt.Errorf("error creating cgroups: %v", err)
}
if err := e.applyLimits(os.Getpid()); err != nil {
if er := e.destroyCgroup(); er != nil {
e.logger.Printf("[ERROR] error destroying cgroup: %v", er)
if er := DestroyCgroup(e.groups); er != nil {
e.logger.Printf("[ERROR] executor: error destroying cgroup: %v", er)
}
if er := e.removeChrootMounts(); er != nil {
e.logger.Printf("[ERROR] error removing chroot: %v", er)
e.logger.Printf("[ERROR] executor: error removing chroot: %v", er)
}
return fmt.Errorf("error entering the plugin process in the cgroup: %v:", err)
}
@ -65,11 +65,11 @@ func (e *UniversalExecutor) applyLimits(pid int) error {
}
// Entering the process in the cgroup
manager := e.getCgroupManager(e.groups)
manager := getCgroupManager(e.groups)
if err := manager.Apply(pid); err != nil {
e.logger.Printf("[ERROR] unable to join cgroup: %v", err)
e.logger.Printf("[ERROR] executor: unable to join cgroup: %v", err)
if err := e.Exit(); err != nil {
e.logger.Printf("[ERROR] unable to kill process: %v", err)
e.logger.Printf("[ERROR] executor: unable to kill process: %v", err)
}
return err
}
@ -144,16 +144,6 @@ func (e *UniversalExecutor) runAs(userid string) error {
return nil
}
// pathExists is a helper function to check if the path exists.
func (e *UniversalExecutor) pathExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// configureChroot configures a chroot
func (e *UniversalExecutor) configureChroot() error {
allocDir := e.ctx.AllocDir
@ -165,40 +155,19 @@ func (e *UniversalExecutor) configureChroot() error {
return err
}
// Mount dev
dev := filepath.Join(e.taskDir, "dev")
if !e.pathExists(dev) {
if err := os.Mkdir(dev, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", dev, err)
}
if err := syscall.Mount("none", dev, "devtmpfs", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /dev to %v: %v", dev, err)
}
}
// Mount proc
proc := filepath.Join(e.taskDir, "proc")
if !e.pathExists(proc) {
if err := os.Mkdir(proc, 0777); err != nil {
return fmt.Errorf("Mkdir(%v) failed: %v", proc, err)
}
if err := syscall.Mount("none", proc, "proc", syscall.MS_RDONLY, ""); err != nil {
return fmt.Errorf("Couldn't mount /proc to %v: %v", proc, err)
}
}
// Set the tasks AllocDir environment variable.
e.ctx.TaskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName)).SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal)).Build()
if e.cmd.SysProcAttr == nil {
e.cmd.SysProcAttr = &syscall.SysProcAttr{}
}
e.cmd.SysProcAttr.Chroot = e.taskDir
e.cmd.Dir = "/"
if err := allocDir.MountSpecialDirs(e.taskDir); err != nil {
return err
}
return nil
}
@ -208,80 +177,44 @@ func (e *UniversalExecutor) removeChrootMounts() error {
// Prevent a race between Wait/ForceStop
e.lock.Lock()
defer e.lock.Unlock()
// Unmount dev.
errs := new(multierror.Error)
dev := filepath.Join(e.taskDir, "dev")
if e.pathExists(dev) {
if err := syscall.Unmount(dev, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev (%v): %v", dev, err))
}
if err := os.RemoveAll(dev); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory (%v): %v", dev, err))
}
}
// Unmount proc.
proc := filepath.Join(e.taskDir, "proc")
if e.pathExists(proc) {
if err := syscall.Unmount(proc, 0); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc (%v): %v", proc, err))
}
if err := os.RemoveAll(proc); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory (%v): %v", dev, err))
}
}
return errs.ErrorOrNil()
return e.ctx.AllocDir.UnmountAll()
}
// destroyCgroup kills all processes in the cgroup and removes the cgroup
// configuration from the host.
func (e *UniversalExecutor) destroyCgroup() error {
if e.groups == nil {
func DestroyCgroup(groups *cgroupConfig.Cgroup) error {
merrs := new(multierror.Error)
if groups == nil {
return fmt.Errorf("Can't destroy: cgroup configuration empty")
}
// Prevent a race between Wait/ForceStop
e.lock.Lock()
defer e.lock.Unlock()
manager := e.getCgroupManager(e.groups)
pids, err := manager.GetPids()
if err != nil {
return fmt.Errorf("Failed to get pids in the cgroup %v: %v", e.groups.Name, err)
}
errs := new(multierror.Error)
for _, pid := range pids {
process, err := os.FindProcess(pid)
if err != nil {
multierror.Append(errs, fmt.Errorf("Failed to find Pid %v: %v", pid, err))
continue
}
if err := process.Kill(); err != nil && err.Error() != "os: process already finished" {
multierror.Append(errs, fmt.Errorf("Failed to kill Pid %v: %v", pid, err))
continue
manager := getCgroupManager(groups)
if pids, perr := manager.GetPids(); perr == nil {
for _, pid := range pids {
proc, err := os.FindProcess(pid)
if err != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("error finding process %v: %v", pid, err))
} else {
if e := proc.Kill(); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("error killing process %v: %v", pid, e))
}
}
}
}
// Remove the cgroup.
if err := manager.Destroy(); err != nil {
multierror.Append(errs, fmt.Errorf("Failed to delete the cgroup directories: %v", err))
multierror.Append(merrs, fmt.Errorf("Failed to delete the cgroup directories: %v", err))
}
if len(errs.Errors) != 0 {
return fmt.Errorf("Failed to destroy cgroup: %v", errs)
if len(merrs.Errors) != 0 {
return fmt.Errorf("errors while destroying cgroup: %v", merrs)
}
return nil
}
// getCgroupManager returns the correct libcontainer cgroup manager.
func (e *UniversalExecutor) getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager {
func getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager {
var manager cgroups.Manager
manager = &cgroupFs.Manager{Cgroups: groups}
if systemd.UseSystemd() {

View File

@ -19,7 +19,7 @@ var HandshakeConfig = plugin.HandshakeConfig{
func GetPluginMap(w io.Writer) map[string]plugin.Plugin {
p := new(ExecutorPlugin)
p.logger = log.New(w, "executor-plugin-server:", log.LstdFlags)
p.logger = log.New(w, "", log.LstdFlags)
return map[string]plugin.Plugin{"executor": p}
}
@ -115,10 +115,14 @@ func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *inter
type ExecutorPlugin struct {
logger *log.Logger
Impl *ExecutorRPCServer
}
func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
return &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger)}, nil
if p.Impl == nil {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger)}
}
return p.Impl, nil
}
func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {

View File

@ -12,10 +12,11 @@ import (
"syscall"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/mitchellh/mapstructure"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
@ -41,11 +42,13 @@ type JavaDriverConfig struct {
// javaHandle is returned from Start/Open as a handle to the PID
type javaHandle struct {
pluginClient *plugin.Client
userPid int
executor executor.Executor
groups *cgroupConfig.Cgroup
pluginClient *plugin.Client
userPid int
executor executor.Executor
isolationConfig *executor.IsolationConfig
taskDir string
allocDir *allocdir.AllocDir
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
@ -155,7 +158,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
@ -176,14 +179,16 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Return a driver handle
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
groups: &ps.IsolationConfig,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
@ -191,10 +196,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
type javaId struct {
KillTimeout time.Duration
PluginConfig *ExecutorReattachConfig
Groups *cgroupConfig.Cgroup
UserPid int
KillTimeout time.Duration
PluginConfig *ExecutorReattachConfig
IsolationConfig *executor.IsolationConfig
TaskDir string
AllocDir *allocdir.AllocDir
UserPid int
}
func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@ -206,29 +213,38 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
pluginConfig := &plugin.ClientConfig{
Reattach: id.PluginConfig.PluginConfig(),
}
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
merrs := new(multierror.Error)
merrs.Errors = append(merrs.Errors, err)
d.logger.Println("[ERROR] driver.java: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
d.logger.Printf("[ERROR] driver.java: error destroying plugin and userpid: %v", e)
merrs.Errors = append(merrs.Errors, fmt.Errorf("error destroying plugin and userpid: %v", e))
}
if e := destroyCgroup(id.Groups); e != nil {
d.logger.Printf("[ERROR] driver.exec: %v", e)
if id.IsolationConfig != nil {
if e := executor.DestroyCgroup(id.IsolationConfig.Cgroup); e != nil {
merrs.Errors = append(merrs.Errors, fmt.Errorf("destroying cgroup failed: %v", e))
}
}
if e := ctx.AllocDir.UnmountAll(); e != nil {
merrs.Errors = append(merrs.Errors, e)
}
return nil, fmt.Errorf("error connecting to plugin: %v", err)
return nil, fmt.Errorf("error connecting to plugin: %v", merrs.ErrorOrNil())
}
// Return a driver handle
h := &javaHandle{
pluginClient: pluginClient,
executor: executor,
userPid: id.UserPid,
groups: id.Groups,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
pluginClient: pluginClient,
executor: exec,
userPid: id.UserPid,
isolationConfig: id.IsolationConfig,
taskDir: id.TaskDir,
allocDir: id.AllocDir,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
@ -237,10 +253,12 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
func (h *javaHandle) ID() string {
id := javaId{
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
Groups: h.groups,
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
TaskDir: h.taskDir,
AllocDir: h.allocDir,
IsolationConfig: h.isolationConfig,
}
data, err := json.Marshal(id)
@ -275,6 +293,20 @@ func (h *javaHandle) Kill() error {
func (h *javaHandle) run() {
ps, err := h.executor.Wait()
close(h.doneCh)
if ps.ExitCode == 0 && err != nil {
if h.isolationConfig != nil {
if e := executor.DestroyCgroup(h.isolationConfig.Cgroup); e != nil {
h.logger.Printf("[ERROR] driver.java: destroying cgroup failed while killing cgroup: %v", e)
}
} else {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERROR] driver.java: error killing user process: %v", e)
}
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.java: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err}
close(h.waitCh)
h.pluginClient.Kill()

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
@ -46,6 +47,7 @@ type qemuHandle struct {
pluginClient *plugin.Client
userPid int
executor executor.Executor
allocDir *allocdir.AllocDir
killTimeout time.Duration
logger *log.Logger
waitCh chan *cstructs.WaitResult
@ -197,7 +199,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
@ -220,6 +222,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
pluginClient: pluginClient,
executor: exec,
userPid: ps.Pid,
allocDir: ctx.AllocDir,
killTimeout: d.DriverContext.KillTimeout(task),
logger: d.logger,
doneCh: make(chan struct{}),
@ -234,6 +237,7 @@ type qemuId struct {
KillTimeout time.Duration
UserPid int
PluginConfig *ExecutorReattachConfig
AllocDir *allocdir.AllocDir
}
func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@ -246,7 +250,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
Reattach: id.PluginConfig.PluginConfig(),
}
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput)
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
d.logger.Println("[ERROR] driver.qemu: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
@ -260,6 +264,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
pluginClient: pluginClient,
executor: executor,
userPid: id.UserPid,
allocDir: id.AllocDir,
logger: d.logger,
killTimeout: id.KillTimeout,
doneCh: make(chan struct{}),
@ -274,6 +279,7 @@ func (h *qemuHandle) ID() string {
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
}
data, err := json.Marshal(id)
@ -309,6 +315,14 @@ func (h *qemuHandle) Kill() error {
func (h *qemuHandle) run() {
ps, err := h.executor.Wait()
if ps.ExitCode == 0 && err != nil {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERROR] driver.qemu: error killing user process: %v", e)
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.qemu: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
close(h.doneCh)
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err}
close(h.waitCh)

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
@ -38,6 +39,7 @@ type rawExecHandle struct {
userPid int
executor executor.Executor
killTimeout time.Duration
allocDir *allocdir.AllocDir
logger *log.Logger
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
@ -103,7 +105,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
@ -127,6 +129,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
executor: exec,
userPid: ps.Pid,
killTimeout: d.DriverContext.KillTimeout(task),
allocDir: ctx.AllocDir,
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
@ -139,6 +142,7 @@ type rawExecId struct {
KillTimeout time.Duration
UserPid int
PluginConfig *ExecutorReattachConfig
AllocDir *allocdir.AllocDir
}
func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
@ -150,7 +154,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
pluginConfig := &plugin.ClientConfig{
Reattach: id.PluginConfig.PluginConfig(),
}
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput)
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
d.logger.Println("[ERROR] driver.raw_exec: error connecting to plugin so destroying plugin pid and user pid")
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
@ -166,6 +170,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
userPid: id.UserPid,
logger: d.logger,
killTimeout: id.KillTimeout,
allocDir: id.AllocDir,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
}
@ -178,6 +183,7 @@ func (h *rawExecHandle) ID() string {
KillTimeout: h.killTimeout,
PluginConfig: NewExecutorReattachConfig(h.pluginClient.ReattachConfig()),
UserPid: h.userPid,
AllocDir: h.allocDir,
}
data, err := json.Marshal(id)
@ -212,6 +218,14 @@ func (h *rawExecHandle) Kill() error {
func (h *rawExecHandle) run() {
ps, err := h.executor.Wait()
close(h.doneCh)
if ps.ExitCode == 0 && err != nil {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERROR] driver.raw_exec: error killing user process: %v", e)
}
if e := h.allocDir.UnmountAll(); e != nil {
h.logger.Printf("[ERROR] driver.raw_exec: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err}
close(h.waitCh)
h.pluginClient.Kill()

View File

@ -1,10 +1,12 @@
package driver
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"testing"
@ -287,7 +289,7 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) {
Name: "sleep",
Config: map[string]interface{}{
"command": testtask.Path(),
"args": []string{"sleep", "15s"},
"args": []string{"sleep", "45s"},
},
Resources: basicResources,
}

View File

@ -7,17 +7,24 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
)
// createExecutor launches an executor plugin and returns an instance of the
// Executor interface
func createExecutor(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) {
func createExecutor(config *plugin.ClientConfig, w io.Writer, clientConfig *config.Config) (executor.Executor, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w)
config.MaxPort = clientConfig.ClientMaxPort
config.MinPort = clientConfig.ClientMinPort
// setting the setsid of the plugin process so that it doesn't get signals sent to
// the nomad client.
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {

View File

@ -3,12 +3,6 @@ package driver
import (
"os/exec"
"syscall"
"fmt"
"github.com/opencontainers/runc/libcontainer/cgroups"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/cgroups/systemd"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
// isolateCommand sets the setsid flag in exec.Cmd to true so that the process
@ -20,21 +14,3 @@ func isolateCommand(cmd *exec.Cmd) {
}
cmd.SysProcAttr.Setsid = true
}
// destroyCgroup destroys a cgroup and thereby killing all the processes in that
// group
func destroyCgroup(group *cgroupConfig.Cgroup) error {
if group == nil {
return nil
}
var manager cgroups.Manager
manager = &cgroupFs.Manager{Cgroups: group}
if systemd.UseSystemd() {
manager = &systemd.Manager{Cgroups: group}
}
if err := manager.Destroy(); err != nil {
return fmt.Errorf("failed to destroy cgroup: %v", err)
}
return nil
}

View File

@ -5,8 +5,6 @@ package driver
import (
"os/exec"
"syscall"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
// isolateCommand sets the setsid flag in exec.Cmd to true so that the process
@ -18,7 +16,3 @@ func isolateCommand(cmd *exec.Cmd) {
}
cmd.SysProcAttr.Setsid = true
}
func destroyCgroup(group *cgroupConfig.Cgroup) error {
return nil
}

View File

@ -2,14 +2,8 @@ package driver
import (
"os/exec"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
// TODO Figure out if this is needed in Wondows
func isolateCommand(cmd *exec.Cmd) {
}
func destroyCgroup(group *cgroupConfig.Cgroup) error {
return nil
}

View File

@ -216,6 +216,8 @@ func (a *Agent) setupClient() error {
}
conf.MaxKillTimeout = dur
}
conf.ClientMaxPort = a.config.Client.ClientMaxPort
conf.ClientMinPort = a.config.Client.ClientMinPort
// Setup the node
conf.Node = new(structs.Node)
@ -242,14 +244,8 @@ func (a *Agent) setupClient() error {
// Reserve some ports for the plugins
if runtime.GOOS == "windows" {
deviceName, err := a.findLoopbackDevice()
if conf.ExecutorMaxPort == 0 {
conf.ExecutorMaxPort = 15000
}
if conf.ExecutorMinPort == 0 {
conf.ExecutorMinPort = 14000
}
if err != nil {
return fmt.Errorf("error finding the device name for the ip 127.0.0.1: %v", err)
return fmt.Errorf("error finding the device name for loopback: %v", err)
}
var nr *structs.NetworkResource
for _, n := range conf.Node.Reserved.Networks {
@ -263,10 +259,9 @@ func (a *Agent) setupClient() error {
ReservedPorts: make([]structs.Port, 0),
}
}
for i := conf.ExecutorMinPort; i <= conf.ExecutorMaxPort; i++ {
nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: i})
for i := conf.ClientMinPort; i <= conf.ClientMaxPort; i++ {
nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: int(i)})
}
}
// Create the client

View File

@ -159,6 +159,14 @@ type ClientConfig struct {
// MaxKillTimeout allows capping the user-specifiable KillTimeout.
MaxKillTimeout string `hcl:"max_kill_timeout"`
// ClientMaxPort is the upper range of the ports that the client uses for
// communicating with plugin subsystems
ClientMaxPort uint `hcl:"client_max_port"`
// ClientMinPort is the lower range of the ports that the client uses for
// communicating with plugin subsystems
ClientMinPort uint `hcl:"client_min_port"`
}
// ServerConfig is configuration specific to the server mode
@ -288,6 +296,8 @@ func DefaultConfig() *Config {
Enabled: false,
NetworkSpeed: 100,
MaxKillTimeout: "30s",
ClientMinPort: 14000,
ClientMaxPort: 19000,
},
Server: &ServerConfig{
Enabled: false,
@ -505,6 +515,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
if b.MaxKillTimeout != "" {
result.MaxKillTimeout = b.MaxKillTimeout
}
if b.ClientMaxPort != 0 {
result.ClientMaxPort = b.ClientMaxPort
}
if b.ClientMinPort != 0 {
result.ClientMinPort = b.ClientMinPort
}
// Add the servers
result.Servers = append(result.Servers, b.Servers...)

View File

@ -104,6 +104,8 @@ func TestConfig_Merge(t *testing.T) {
"foo": "bar",
"baz": "zip",
},
ClientMaxPort: 20000,
ClientMinPort: 22000,
NetworkSpeed: 105,
MaxKillTimeout: "50s",
},

View File

@ -127,6 +127,15 @@ func (s *Server) applyPlan(result *structs.PlanResult, snap *state.StateSnapshot
}
req.Alloc = append(req.Alloc, result.FailedAllocs...)
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
now := time.Now().UTC().UnixNano()
for _, alloc := range req.Alloc {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
}
// Dispatch the Raft transaction
future, err := s.raftApplyFuture(structs.AllocUpdateRequestType, &req)
if err != nil {

View File

@ -1794,6 +1794,10 @@ type Allocation struct {
// AllocModifyIndex is not updated when the client updates allocations. This
// lets the client pull only the allocs updated by the server.
AllocModifyIndex uint64
// CreateTime is the time the allocation has finished scheduling and been
// verified by the plan applier.
CreateTime int64
}
func (a *Allocation) Copy() *Allocation {
@ -1840,6 +1844,7 @@ func (a *Allocation) Stub() *AllocListStub {
TaskStates: a.TaskStates,
CreateIndex: a.CreateIndex,
ModifyIndex: a.ModifyIndex,
CreateTime: a.CreateTime,
}
}
@ -1887,6 +1892,7 @@ type AllocListStub struct {
TaskStates map[string]*TaskState
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
}
// AllocMetric is used to track various metrics while attempting

View File

@ -246,7 +246,7 @@ be specified using the `?region=` query parameter.
* `TaskStateRunning` - The task is currently running.
* `TaskStateDead` - The task is dead and will not run again.
<p>The latest 10 events are stored per task. Each event is timestamped (unix seconds)
<p>The latest 10 events are stored per task. Each event is timestamped (unix nano-seconds)
and has one of the following types:</p>
* `Driver Failure` - The task could not be started due to a failure in the

View File

@ -24,7 +24,7 @@ be specified using the `?region=` query parameter.
<dd>GET</dd>
<dt>URL</dt>
<dd>`/v1/evaluations`</dd>
<dd>`/v1/evaluation/<id>`</dd>
<dt>Parameters</dt>
<dd>