Merge pull request #878 from hashicorp/b-kill-timeout-update
client: Updating kill timeout adheres to operator specified maximum
This commit is contained in:
commit
f1c9d11c55
|
@ -91,11 +91,12 @@ func (c *DockerDriverConfig) Validate() error {
|
|||
}
|
||||
|
||||
type dockerPID struct {
|
||||
Version string
|
||||
ImageID string
|
||||
ContainerID string
|
||||
KillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
Version string
|
||||
ImageID string
|
||||
ContainerID string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
}
|
||||
|
||||
type DockerHandle struct {
|
||||
|
@ -109,6 +110,7 @@ type DockerHandle struct {
|
|||
containerID string
|
||||
version string
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
@ -624,6 +626,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
|||
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &DockerHandle{
|
||||
client: client,
|
||||
logCollector: logCollector,
|
||||
|
@ -634,7 +637,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
|||
imageID: dockerImage.ID,
|
||||
containerID: container.ID,
|
||||
version: d.config.Version,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
@ -703,6 +707,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
|||
containerID: pid.ContainerID,
|
||||
version: pid.Version,
|
||||
killTimeout: pid.KillTimeout,
|
||||
maxKillTimeout: pid.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
@ -713,11 +718,12 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
|||
func (h *DockerHandle) ID() string {
|
||||
// Return a handle to the PID
|
||||
pid := dockerPID{
|
||||
Version: h.version,
|
||||
ImageID: h.imageID,
|
||||
ContainerID: h.containerID,
|
||||
KillTimeout: h.killTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
Version: h.version,
|
||||
ImageID: h.imageID,
|
||||
ContainerID: h.containerID,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
}
|
||||
data, err := json.Marshal(pid)
|
||||
if err != nil {
|
||||
|
@ -736,7 +742,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
|
||||
func (h *DockerHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
|
||||
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
|
||||
}
|
||||
|
|
|
@ -152,18 +152,19 @@ func TestDockerDriver_Handle(t *testing.T) {
|
|||
defer pluginClient.Kill()
|
||||
|
||||
h := &DockerHandle{
|
||||
version: "version",
|
||||
imageID: "imageid",
|
||||
logCollector: logCollector,
|
||||
pluginClient: pluginClient,
|
||||
containerID: "containerid",
|
||||
killTimeout: 5 * time.Nanosecond,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
version: "version",
|
||||
imageID: "imageid",
|
||||
logCollector: logCollector,
|
||||
pluginClient: pluginClient,
|
||||
containerID: "containerid",
|
||||
killTimeout: 5 * time.Nanosecond,
|
||||
maxKillTimeout: 15 * time.Nanosecond,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
||||
actual := h.ID()
|
||||
expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}",
|
||||
expected := fmt.Sprintf("DOCKER:{\"Version\":\"version\",\"ImageID\":\"imageid\",\"ContainerID\":\"containerid\",\"KillTimeout\":5,\"MaxKillTimeout\":15,\"PluginConfig\":{\"Pid\":%d,\"AddrNet\":\"unix\",\"AddrName\":\"%s\"}}",
|
||||
pluginClient.ReattachConfig().Pid, pluginClient.ReattachConfig().Addr.String())
|
||||
if actual != expected {
|
||||
t.Errorf("Expected `%s`, found `%s`", expected, actual)
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"log"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
|
@ -84,24 +83,6 @@ func NewDriverContext(taskName string, config *config.Config, node *structs.Node
|
|||
}
|
||||
}
|
||||
|
||||
// KillTimeout returns the timeout that should be used for the task between
|
||||
// signaling and killing the task.
|
||||
func (d *DriverContext) KillTimeout(task *structs.Task) time.Duration {
|
||||
max := d.config.MaxKillTimeout.Nanoseconds()
|
||||
desired := task.KillTimeout.Nanoseconds()
|
||||
|
||||
// Make the minimum time between signal and kill, 1 second.
|
||||
if desired == 0 {
|
||||
desired = (1 * time.Second).Nanoseconds()
|
||||
}
|
||||
|
||||
if desired < max {
|
||||
return time.Duration(desired)
|
||||
}
|
||||
|
||||
return d.config.MaxKillTimeout
|
||||
}
|
||||
|
||||
// DriverHandle is an opaque handle into a driver used for task
|
||||
// manipulation
|
||||
type DriverHandle interface {
|
||||
|
|
|
@ -66,24 +66,6 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
|
|||
return driverCtx, execCtx
|
||||
}
|
||||
|
||||
func TestDriver_KillTimeout(t *testing.T) {
|
||||
expected := 1 * time.Second
|
||||
task := &structs.Task{Name: "foo", KillTimeout: expected}
|
||||
ctx, _ := testDriverContexts(task)
|
||||
ctx.config.MaxKillTimeout = 10 * time.Second
|
||||
|
||||
if actual := ctx.KillTimeout(task); expected != actual {
|
||||
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
|
||||
}
|
||||
|
||||
expected = 10 * time.Second
|
||||
task = &structs.Task{KillTimeout: 11 * time.Second}
|
||||
|
||||
if actual := ctx.KillTimeout(task); expected != actual {
|
||||
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDriver_GetTaskEnv(t *testing.T) {
|
||||
t.Parallel()
|
||||
task := &structs.Task{
|
||||
|
|
|
@ -42,6 +42,7 @@ type execHandle struct {
|
|||
userPid int
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
|
@ -134,13 +135,15 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
d.logger.Printf("[DEBUG] driver.exec: started process via plugin with pid: %v", ps.Pid)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &execHandle{
|
||||
pluginClient: pluginClient,
|
||||
userPid: ps.Pid,
|
||||
executor: exec,
|
||||
allocDir: ctx.AllocDir,
|
||||
isolationConfig: ps.IsolationConfig,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
logger: d.logger,
|
||||
version: d.config.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
|
@ -153,6 +156,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
type execId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
UserPid int
|
||||
TaskDir string
|
||||
AllocDir *allocdir.AllocDir
|
||||
|
@ -198,6 +202,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
|||
logger: d.logger,
|
||||
version: id.Version,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
@ -209,6 +214,7 @@ func (h *execHandle) ID() string {
|
|||
id := execId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
|
@ -228,7 +234,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
|
||||
func (h *execHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
|
|
@ -47,13 +47,14 @@ type javaHandle struct {
|
|||
executor executor.Executor
|
||||
isolationConfig *cstructs.IsolationConfig
|
||||
|
||||
taskDir string
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
version string
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
taskDir string
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
version string
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// NewJavaDriver is used to create a new exec driver
|
||||
|
@ -182,6 +183,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
d.logger.Printf("[DEBUG] driver.java: started process with pid: %v", ps.Pid)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &javaHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
|
@ -189,7 +191,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
isolationConfig: ps.IsolationConfig,
|
||||
taskDir: taskDir,
|
||||
allocDir: ctx.AllocDir,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
|
@ -210,6 +213,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool {
|
|||
type javaId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
IsolationConfig *cstructs.IsolationConfig
|
||||
TaskDir string
|
||||
|
@ -257,6 +261,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
|||
logger: d.logger,
|
||||
version: id.Version,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
@ -269,6 +274,7 @@ func (h *javaHandle) ID() string {
|
|||
id := javaId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
TaskDir: h.taskDir,
|
||||
|
@ -289,7 +295,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
|
||||
func (h *javaHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
|
|
@ -44,15 +44,16 @@ type QemuDriverConfig struct {
|
|||
|
||||
// qemuHandle is returned from Start/Open as a handle to the PID
|
||||
type qemuHandle struct {
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
logger *log.Logger
|
||||
version string
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
logger *log.Logger
|
||||
version string
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// NewQemuDriver is used to create a new exec driver
|
||||
|
@ -219,16 +220,18 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
d.logger.Printf("[INFO] Started new QemuVM: %s", vmID)
|
||||
|
||||
// Create and Return Handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &qemuHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
||||
go h.run()
|
||||
|
@ -236,11 +239,12 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
}
|
||||
|
||||
type qemuId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
}
|
||||
|
||||
func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
|
||||
|
@ -264,15 +268,16 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
|||
|
||||
// Return a driver handle
|
||||
h := &qemuHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
allocDir: id.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
allocDir: id.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
@ -280,11 +285,12 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
|||
|
||||
func (h *qemuHandle) ID() string {
|
||||
id := qemuId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(id)
|
||||
|
@ -300,7 +306,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
|
||||
func (h *qemuHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
|
|
@ -35,15 +35,16 @@ type RawExecDriver struct {
|
|||
|
||||
// rawExecHandle is returned from Start/Open as a handle to the PID
|
||||
type rawExecHandle struct {
|
||||
version string
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
killTimeout time.Duration
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
version string
|
||||
pluginClient *plugin.Client
|
||||
userPid int
|
||||
executor executor.Executor
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// NewRawExecDriver is used to create a new raw exec driver
|
||||
|
@ -125,27 +126,30 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
|||
d.logger.Printf("[DEBUG] driver.raw_exec: started process with pid: %v", ps.Pid)
|
||||
|
||||
// Return a driver handle
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &rawExecHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
allocDir: ctx.AllocDir,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
userPid: ps.Pid,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
allocDir: ctx.AllocDir,
|
||||
version: d.config.Version,
|
||||
logger: d.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
}
|
||||
|
||||
type rawExecId struct {
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
Version string
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
UserPid int
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
}
|
||||
|
||||
func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
|
||||
|
@ -168,15 +172,16 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
|
|||
|
||||
// Return a driver handle
|
||||
h := &rawExecHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
allocDir: id.AllocDir,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: executor,
|
||||
userPid: id.UserPid,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
allocDir: id.AllocDir,
|
||||
version: id.Version,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
@ -184,11 +189,12 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
|
|||
|
||||
func (h *rawExecHandle) ID() string {
|
||||
id := rawExecId{
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
Version: h.version,
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
UserPid: h.userPid,
|
||||
AllocDir: h.allocDir,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(id)
|
||||
|
@ -204,7 +210,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
|
||||
func (h *rawExecHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
|
|
@ -55,23 +55,25 @@ type RktDriverConfig struct {
|
|||
|
||||
// rktHandle is returned from Start/Open as a handle to the PID
|
||||
type rktHandle struct {
|
||||
pluginClient *plugin.Client
|
||||
executorPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
killTimeout time.Duration
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
pluginClient *plugin.Client
|
||||
executorPid int
|
||||
executor executor.Executor
|
||||
allocDir *allocdir.AllocDir
|
||||
logger *log.Logger
|
||||
killTimeout time.Duration
|
||||
maxKillTimeout time.Duration
|
||||
waitCh chan *cstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// rktPID is a struct to map the pid running the process to the vm image on
|
||||
// disk
|
||||
type rktPID struct {
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
ExecutorPid int
|
||||
KillTimeout time.Duration
|
||||
PluginConfig *PluginReattachConfig
|
||||
AllocDir *allocdir.AllocDir
|
||||
ExecutorPid int
|
||||
KillTimeout time.Duration
|
||||
MaxKillTimeout time.Duration
|
||||
}
|
||||
|
||||
// NewRktDriver is used to create a new exec driver
|
||||
|
@ -227,15 +229,17 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
|||
}
|
||||
|
||||
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmdArgs)
|
||||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &rktHandle{
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
executorPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: d.DriverContext.KillTimeout(task),
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executor: exec,
|
||||
executorPid: ps.Pid,
|
||||
allocDir: ctx.AllocDir,
|
||||
logger: d.logger,
|
||||
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
|
||||
maxKillTimeout: maxKill,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
@ -244,18 +248,18 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
|||
func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
|
||||
// Parse the handle
|
||||
pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:"))
|
||||
qpid := &rktPID{}
|
||||
if err := json.Unmarshal(pidBytes, qpid); err != nil {
|
||||
id := &rktPID{}
|
||||
if err := json.Unmarshal(pidBytes, id); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse Rkt handle '%s': %v", handleID, err)
|
||||
}
|
||||
|
||||
pluginConfig := &plugin.ClientConfig{
|
||||
Reattach: qpid.PluginConfig.PluginConfig(),
|
||||
Reattach: id.PluginConfig.PluginConfig(),
|
||||
}
|
||||
executor, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
|
||||
if err != nil {
|
||||
d.logger.Println("[ERROR] driver.rkt: error connecting to plugin so destroying plugin pid and user pid")
|
||||
if e := destroyPlugin(qpid.PluginConfig.Pid, qpid.ExecutorPid); e != nil {
|
||||
if e := destroyPlugin(id.PluginConfig.Pid, id.ExecutorPid); e != nil {
|
||||
d.logger.Printf("[ERROR] driver.rkt: error destroying plugin and executor pid: %v", e)
|
||||
}
|
||||
return nil, fmt.Errorf("error connecting to plugin: %v", err)
|
||||
|
@ -263,14 +267,15 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
|
|||
|
||||
// Return a driver handle
|
||||
h := &rktHandle{
|
||||
pluginClient: pluginClient,
|
||||
executorPid: qpid.ExecutorPid,
|
||||
allocDir: qpid.AllocDir,
|
||||
executor: executor,
|
||||
logger: d.logger,
|
||||
killTimeout: qpid.KillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
pluginClient: pluginClient,
|
||||
executorPid: id.ExecutorPid,
|
||||
allocDir: id.AllocDir,
|
||||
executor: executor,
|
||||
logger: d.logger,
|
||||
killTimeout: id.KillTimeout,
|
||||
maxKillTimeout: id.MaxKillTimeout,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
|
||||
go h.run()
|
||||
|
@ -280,10 +285,11 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
|
|||
func (h *rktHandle) ID() string {
|
||||
// Return a handle to the PID
|
||||
pid := &rktPID{
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
KillTimeout: h.killTimeout,
|
||||
ExecutorPid: h.executorPid,
|
||||
AllocDir: h.allocDir,
|
||||
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
|
||||
KillTimeout: h.killTimeout,
|
||||
MaxKillTimeout: h.maxKillTimeout,
|
||||
ExecutorPid: h.executorPid,
|
||||
AllocDir: h.allocDir,
|
||||
}
|
||||
data, err := json.Marshal(pid)
|
||||
if err != nil {
|
||||
|
@ -298,7 +304,7 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
|
||||
func (h *rktHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = task.KillTimeout
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
|
||||
// Update is not possible
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
|
@ -109,3 +110,26 @@ func validateCommand(command, argField string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetKillTimeout returns the kill timeout to use given the tasks desired kill
|
||||
// timeout and the operator configured max kill timeout.
|
||||
func GetKillTimeout(desired, max time.Duration) time.Duration {
|
||||
maxNanos := max.Nanoseconds()
|
||||
desiredNanos := desired.Nanoseconds()
|
||||
|
||||
// Make the minimum time between signal and kill, 1 second.
|
||||
if desiredNanos <= 0 {
|
||||
desiredNanos = (1 * time.Second).Nanoseconds()
|
||||
}
|
||||
|
||||
// Protect against max not being set properly.
|
||||
if maxNanos <= 0 {
|
||||
maxNanos = (10 * time.Second).Nanoseconds()
|
||||
}
|
||||
|
||||
if desiredNanos < maxNanos {
|
||||
return time.Duration(desiredNanos)
|
||||
}
|
||||
|
||||
return max
|
||||
}
|
||||
|
|
22
client/driver/utils_test.go
Normal file
22
client/driver/utils_test.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package driver
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestDriver_KillTimeout(t *testing.T) {
|
||||
expected := 1 * time.Second
|
||||
max := 10 * time.Second
|
||||
|
||||
if actual := GetKillTimeout(expected, max); expected != actual {
|
||||
t.Fatalf("GetKillTimeout() returned %v; want %v", actual, expected)
|
||||
}
|
||||
|
||||
expected = 10 * time.Second
|
||||
input := 11 * time.Second
|
||||
|
||||
if actual := GetKillTimeout(input, max); expected != actual {
|
||||
t.Fatalf("KillTimeout() returned %v; want %v", actual, expected)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue