Add Driver.Prestart method

The Driver.Prestart method currently does very little but lays the
foundation for where lifecycle plugins can interleave execution _after_
task environment setup but _before_ the task starts.

Currently Prestart does two things:

* Any driver specific task environment building
* Download Docker images

This change also attaches a TaskEvent emitter to Drivers, so they can
emit events during task initialization.
This commit is contained in:
Michael Schurter 2016-11-29 16:39:36 -08:00
parent 1c4195b985
commit 770ed703d0
21 changed files with 284 additions and 78 deletions

View File

@ -252,30 +252,32 @@ const (
TaskSiblingFailed = "Sibling task failed"
TaskSignaling = "Signaling"
TaskRestartSignal = "Restart Signaled"
TaskInitializing = "Initializing"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
// appropriate to the events type.
type TaskEvent struct {
Type string
Time int64
FailsTask bool
RestartReason string
SetupError string
DriverError string
ExitCode int
Signal int
Message string
KillReason string
KillTimeout time.Duration
KillError string
StartDelay int64
DownloadError string
ValidationError string
DiskLimit int64
DiskSize int64
FailedSibling string
VaultError string
TaskSignalReason string
TaskSignal string
Type string
Time int64
FailsTask bool
RestartReason string
SetupError string
DriverError string
ExitCode int
Signal int
Message string
KillReason string
KillTimeout time.Duration
KillError string
StartDelay int64
DownloadError string
ValidationError string
DiskLimit int64
DiskSize int64
FailedSibling string
VaultError string
TaskSignalReason string
TaskSignal string
InitializationMessage string
}

View File

@ -792,7 +792,7 @@ func (c *Client) setupDrivers() error {
var avail []string
var skipped []string
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil)
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil)
for name := range driver.BuiltinDrivers {
// Skip fingerprinting drivers that are not in the whitelist if it is
// enabled.

View File

@ -91,6 +91,10 @@ const (
type DockerDriver struct {
DriverContext
imageID string
waitClient *docker.Client
driverConfig *DockerDriverConfig
}
type DockerDriverAuth struct {
@ -339,31 +343,29 @@ func (d *DockerDriver) Abilities() DriverAbilities {
}
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
// Set environment variables.
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath).
SetTaskLocalDir(allocdir.TaskLocalContainerPath).SetSecretsDir(allocdir.TaskSecretsContainerPath).Build()
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
if err != nil {
return nil, err
return err
}
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
return fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
// Initialize docker API clients
client, waitClient, err := d.dockerClients()
if err != nil {
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
return fmt.Errorf("Failed to connect to docker daemon: %s", err)
}
if err := d.createImage(driverConfig, client, taskDir); err != nil {
return nil, err
return err
}
image := driverConfig.ImageName
@ -371,14 +373,27 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
dockerImage, err := client.InspectImage(image)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed getting image id for %s: %s", image, err)
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
return fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
}
d.logger.Printf("[DEBUG] driver.docker: identified image %s as %s", image, dockerImage.ID)
// Set state needed by Start()
d.imageID = dockerImage.ID
d.waitClient = waitClient
d.driverConfig = driverConfig
return nil
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", pluginLogFile),
@ -404,9 +419,9 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
// Only launch syslog server if we're going to use it!
syslogAddr := ""
if runtime.GOOS == "darwin" && len(driverConfig.Logging) == 0 {
if runtime.GOOS == "darwin" && len(d.driverConfig.Logging) == 0 {
d.logger.Printf("[DEBUG] driver.docker: disabling syslog driver as Docker for Mac workaround")
} else if len(driverConfig.Logging) == 0 || driverConfig.Logging[0].Type == "syslog" {
} else if len(d.driverConfig.Logging) == 0 || d.driverConfig.Logging[0].Type == "syslog" {
ss, err := exec.LaunchSyslogServer()
if err != nil {
pluginClient.Kill()
@ -415,11 +430,11 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
syslogAddr = ss.Addr
}
config, err := d.createContainerConfig(ctx, task, driverConfig, syslogAddr)
config, err := d.createContainerConfig(ctx, task, d.driverConfig, syslogAddr)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", d.imageID, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err)
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", d.imageID, err)
}
container, rerr := d.createContainer(config)
@ -432,17 +447,17 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[INFO] driver.docker: created container %s", container.ID)
cleanupImage := d.config.ReadBoolDefault("docker.cleanup.image", true)
// We don't need to start the container if the container is already running
// since we don't create containers which are already present on the host
// and are running
if !container.State.Running {
// Start the container
err := d.startContainer(container)
if err != nil {
if err := client.StartContainer(container.ID, container.HostConfig); err != nil {
d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err)
pluginClient.Kill()
err.Err = fmt.Sprintf("Failed to start container %s: %s", container.ID, err)
return nil, err
return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err)
}
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
} else {
@ -454,12 +469,12 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &DockerHandle{
client: client,
waitClient: waitClient,
waitClient: d.waitClient,
executor: exec,
pluginClient: pluginClient,
cleanupImage: cleanupImage,
logger: d.logger,
imageID: dockerImage.ID,
imageID: d.imageID,
containerID: container.ID,
version: d.config.Version,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),

View File

@ -101,6 +101,10 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle
driver := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := driver.Prestart(execCtx, task); err != nil {
execCtx.AllocDir.Destroy()
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
if err != nil {
execCtx.AllocDir.Destroy()
@ -167,6 +171,9 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
d := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -256,6 +263,9 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) {
// Copy the image into the task's directory
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -316,9 +326,9 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)
_, err := d.Start(execCtx, task)
err := d.Prestart(execCtx, task)
if err == nil {
t.Fatalf("want err: %v", err)
t.Fatalf("want error in prestart: %v", err)
}
if rerr, ok := err.(*structs.RecoverableError); !ok {
@ -366,6 +376,9 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
d := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -456,6 +469,9 @@ func TestDockerDriver_StartN(t *testing.T) {
d := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
handles[idx], err = d.Start(execCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
@ -513,6 +529,9 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
copyImage(execCtx, task, "busybox_musl.tar", t)
copyImage(execCtx, task, "busybox_glibc.tar", t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
handles[idx], err = d.Start(execCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
@ -804,6 +823,10 @@ func TestDockerDriver_User(t *testing.T) {
defer execCtx.AllocDir.Destroy()
copyImage(execCtx, task, "busybox.tar", t)
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
// It should fail because the user "alice" does not exist on the given
// image.
handle, err := driver.Start(execCtx, task)
@ -953,6 +976,9 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -1035,7 +1061,11 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
t.Fatalf("Failed to get task env: %v", err)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv)
logger := testLogger()
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter)
driver := NewDockerDriver(driverCtx)
copyImage(execCtx, task, "busybox.tar", t)
@ -1058,6 +1088,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
if _, err := driver.Start(execCtx, task); err == nil {
t.Fatalf("Started driver successfully when volumes should have been disabled.")
}
@ -1068,6 +1101,9 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".")
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -1106,6 +1142,9 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) {
task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
if err := driver.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(execCtx, task)
if err != nil {
t.Fatalf("Failed to start docker driver: %v", err)

View File

@ -51,6 +51,10 @@ type Driver interface {
// Drivers must support the fingerprint interface for detection
fingerprint.Fingerprint
// Prestart prepares the task environment and performs expensive
// intialization steps like downloading images.
Prestart(*ExecContext, *structs.Task) error
// Start is used to being task execution
Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error)
@ -70,6 +74,9 @@ type DriverAbilities struct {
SendSignals bool
}
// LogEventFn is a callback which allows Drivers to emit task events.
type LogEventFn func(message string, args ...interface{})
// DriverContext is a means to inject dependencies such as loggers, configs, and
// node attributes into a Driver without having to change the Driver interface
// each time we do it. Used in conjection with Factory, above.
@ -79,18 +86,14 @@ type DriverContext struct {
logger *log.Logger
node *structs.Node
taskEnv *env.TaskEnvironment
emitEvent LogEventFn
}
// NewEmptyDriverContext returns a DriverContext with all fields set to their
// zero value.
func NewEmptyDriverContext() *DriverContext {
return &DriverContext{
taskName: "",
config: nil,
node: nil,
logger: nil,
taskEnv: nil,
}
return &DriverContext{}
}
// NewDriverContext initializes a new DriverContext with the specified fields.
@ -98,13 +101,14 @@ func NewEmptyDriverContext() *DriverContext {
// private to the driver. If we want to change this later we can gorename all of
// the fields in DriverContext.
func NewDriverContext(taskName string, config *config.Config, node *structs.Node,
logger *log.Logger, taskEnv *env.TaskEnvironment) *DriverContext {
logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext {
return &DriverContext{
taskName: taskName,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
taskName: taskName,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
emitEvent: eventEmitter,
}
}

View File

@ -89,7 +89,11 @@ func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
return nil, nil
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv)
logger := testLogger()
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, logger, taskEnv, emitter)
return driverCtx, execCtx
}

View File

@ -92,6 +92,13 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *ExecDriver) Prestart(execctx *ExecContext, task *structs.Task) error {
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
return nil
}
func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig ExecDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
@ -104,10 +111,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
// Get the task directory for storing the executor logs.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {

View File

@ -65,6 +65,9 @@ func TestExecDriver_StartOpen_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -105,6 +108,9 @@ func TestExecDriver_KillUserPid_OnPluginReconnectFailure(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -165,6 +171,9 @@ func TestExecDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -215,6 +224,9 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -265,6 +277,9 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -327,6 +342,9 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -388,6 +406,9 @@ func TestExecDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()

View File

@ -163,16 +163,19 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *JavaDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
return nil
}
func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig JavaDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)

View File

@ -92,6 +92,9 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -142,6 +145,9 @@ func TestJavaDriver_Start_Wait(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -204,6 +210,9 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -262,6 +271,9 @@ func TestJavaDriver_Signal(t *testing.T) {
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -317,6 +329,9 @@ func TestJavaDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewJavaDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()

View File

@ -168,6 +168,10 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
return true, nil
}
func (d *LxcDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
}
// Start starts the LXC Driver
func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig LxcDriverConfig

View File

@ -69,6 +69,9 @@ func TestLxcDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewLxcDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -141,6 +144,9 @@ func TestLxcDriver_Open_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewLxcDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)

View File

@ -75,6 +75,10 @@ func (d *MockDriver) Abilities() DriverAbilities {
}
}
func (d *MockDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
}
// Start starts the mock driver
func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig MockDriverConfig

View File

@ -135,6 +135,10 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *QemuDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
return nil
}
// Run an existing Qemu image. Start() will pull down an existing, valid Qemu
// image and save it to the Drivers Allocation Dir
func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {

View File

@ -107,6 +107,13 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
return false, nil
}
func (d *RawExecDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
return nil
}
func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig ExecDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
@ -125,10 +132,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return nil, err
}
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)

View File

@ -75,6 +75,9 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -120,6 +123,9 @@ func TestRawExecDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -170,6 +176,9 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -219,6 +228,9 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -268,6 +280,9 @@ func TestRawExecDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()
@ -313,6 +328,9 @@ done
fmt.Errorf("Failed to write data")
}
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("prestart err: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)

View File

@ -206,6 +206,13 @@ func (d *RktDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) error {
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath)
d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath)
return nil
}
// Run an existing Rkt image.
func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig RktDriverConfig
@ -289,10 +296,6 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug))
// Inject environment variables
d.taskEnv.SetAllocDir(allocdir.SharedAllocContainerPath)
d.taskEnv.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
d.taskEnv.SetSecretsDir(allocdir.TaskSecretsContainerPath)
d.taskEnv.Build()
for k, v := range d.taskEnv.EnvMap() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v))
}

View File

@ -98,6 +98,9 @@ func TestRktDriver_Start_DNS(t *testing.T) {
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -145,6 +148,9 @@ func TestRktDriver_Start_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -202,6 +208,9 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -252,6 +261,7 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) {
"-c",
fmt.Sprintf(`echo -n %s > foo/%s`, string(exp), file),
},
"net": []string{"none"},
"volumes": []string{fmt.Sprintf("%s:/foo", tmpvol)},
},
LogConfig: &structs.LogConfig{
@ -268,6 +278,9 @@ func TestRktDriver_Start_Wait_AllocDir(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -326,6 +339,9 @@ func TestRktDriverUser(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()
@ -364,6 +380,9 @@ func TestRktTrustPrefix(t *testing.T) {
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err == nil {
handle.Kill()
@ -438,6 +457,9 @@ func TestRktDriver_PortsMapping(t *testing.T) {
d := NewRktDriver(driverCtx)
if err := d.Prestart(execCtx, task); err != nil {
t.Fatalf("error in prestart: %v", err)
}
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)

View File

@ -346,7 +346,15 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID)
}
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env)
// Create a task-specific event emitter callback to expose minimal
// state to drivers
eventEmitter := func(m string, args ...interface{}) {
msg := fmt.Sprintf(m, args...)
r.logger.Printf("[DEBUG] client: initialization event for alloc %q: %s", r.alloc.ID, msg)
r.setState("", structs.NewTaskEvent(structs.TaskInitializing).SetInitializationMessage(msg))
}
driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env, eventEmitter)
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
@ -1019,17 +1027,31 @@ func (r *TaskRunner) startTask() error {
// Create a driver
driver, err := r.createDriver()
if err != nil {
return fmt.Errorf("failed to create driver of task '%s' for alloc '%s': %v",
return fmt.Errorf("failed to create driver of task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)
}
// Run prestart
if err := driver.Prestart(r.ctx, r.task); err != nil {
wrapped := fmt.Errorf("failed to initialize task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)
r.logger.Printf("[WARN] client: %v", wrapped)
if rerr, ok := err.(*structs.RecoverableError); ok {
return structs.NewRecoverableError(wrapped, rerr.Recoverable)
}
return wrapped
}
// Start the job
handle, err := driver.Start(r.ctx, r.task)
if err != nil {
wrapped := fmt.Errorf("failed to start task '%s' for alloc '%s': %v",
wrapped := fmt.Errorf("failed to start task %q for alloc %q: %v",
r.task.Name, r.alloc.ID, err)
r.logger.Printf("[INFO] client: %v", wrapped)
r.logger.Printf("[WARN] client: %v", wrapped)
if rerr, ok := err.(*structs.RecoverableError); ok {
return structs.NewRecoverableError(wrapped, rerr.Recoverable)

View File

@ -378,6 +378,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
} else {
desc = "Task signaled to restart"
}
case api.TaskInitializing:
desc = event.InitializationMessage
}
// Reverse order so we are sorted by time

View File

@ -2555,6 +2555,10 @@ const (
// TaskSiblingFailed indicates that a sibling task in the task group has
// failed.
TaskSiblingFailed = "Sibling task failed"
// TaskInitializing indicates that a task is performing a potentially
// slow initialization action such as downloading a Docker image.
TaskInitializing = "Initializing"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@ -2613,6 +2617,9 @@ type TaskEvent struct {
// TaskSignal is the signal that was sent to the task
TaskSignal string
// InitializationMessage indicates the initialization step being executed.
InitializationMessage string
}
func (te *TaskEvent) GoString() string {
@ -2741,6 +2748,11 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent {
return e
}
func (e *TaskEvent) SetInitializationMessage(m string) *TaskEvent {
e.InitializationMessage = m
return e
}
// TaskArtifact is an artifact to download before running the task.
type TaskArtifact struct {
// GetterSource is the source to download an artifact using go-getter