Return errors from cleanup and let TaskRunner retry

This commit is contained in:
Michael Schurter 2017-01-12 17:21:54 -08:00
parent ec81325ddc
commit dc68aa1a5a
12 changed files with 178 additions and 78 deletions

View file

@ -376,27 +376,25 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR
}
// Ensure the image is available
pulled, err := d.createImage(driverConfig, client, ctx.TaskDir)
if err != nil {
if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil {
return nil, err
}
if pulled {
// An image was downloaded, add the ID to created resources
dockerImage, err := client.InspectImage(driverConfig.ImageName)
if err != nil {
// When an error occurs we leak the image, but this should be
// extremely rare. There's no point in returning an error
// because the image won't be redownloaded as it now exists.
d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err)
return nil, nil
}
res := NewCreatedResources()
res.Add(dockerImageResKey, dockerImage.ID)
return res, nil
// Regardless of whether the image was downloaded already or not, store
// it as a created resource. Cleanup will soft fail if the image is
// still in use by another contianer.
dockerImage, err := client.InspectImage(driverConfig.ImageName)
if err != nil {
// When an error occurs we leak the image, but this should be
// extremely rare. There's no point in returning an error
// because the image won't be redownloaded as it now exists.
d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err)
return nil, nil
}
return nil, nil
res := NewCreatedResources()
res.Add(dockerImageResKey, dockerImage.ID)
return res, nil
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
@ -498,23 +496,40 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return h, nil
}
func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) {
if res == nil {
// Nothing to cleanup
return
func (d *DockerDriver) Cleanup(_ *ExecContext, key, value string) error {
switch key {
case dockerImageResKey:
return d.cleanupImage(value)
default:
d.logger.Printf("[WARN] driver.docker: unknown resource to cleanup: %q -> %q", key, value)
return nil
}
}
// cleanupImage removes a Docker image. No error is returned if the image
// doesn't exist or is still in use. Requires the global client to already be
// initialized.
func (d *DockerDriver) cleanupImage(id string) error {
if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) {
// Config says not to cleanup
return nil
}
// If the client should cleanup images and the image was downloaded, remove it
cleanupImage := d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault)
if ids := res.Resources[dockerImageResKey]; cleanupImage && len(ids) > 0 {
for _, id := range ids {
if err := client.RemoveImage(id); err != nil {
d.logger.Printf("[WARN] driver.docker: cleanup failed to remove downloaded image %q: %v", id, err)
} else {
d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
}
if err := client.RemoveImage(id); err != nil {
if err == docker.ErrNoSuchImage {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id)
return nil
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id)
return nil
}
d.logger.Printf("[WARN] driver.docker: cleanup failed to remove downloaded image %q: %v", id, err)
return err
}
d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
return nil
}
// dockerClients creates two *docker.Client, one for long running operations and
@ -914,7 +929,7 @@ func (d *DockerDriver) Periodic() (bool, time.Duration) {
// loading it from the file system
//
// Returns true if an image was downloaded and should be cleaned up.
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (bool, error) {
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error {
image := driverConfig.ImageName
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
@ -929,20 +944,20 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
} else if tag != "latest" {
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
// Image exists, nothing to do
return false, nil
return nil
}
}
// Load the image if specified
if len(driverConfig.LoadImages) > 0 {
return false, d.loadImage(driverConfig, client, taskDir)
return d.loadImage(driverConfig, client, taskDir)
}
// Download the image
if err := d.pullImage(driverConfig, client, repo, tag); err != nil {
return false, err
return err
}
return true, nil
return nil
}
// pullImage creates an image by pulling it from a docker registry

View file

@ -99,25 +99,22 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie
driver := NewDockerDriver(tctx.DriverCtx)
copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar")
res, err := driver.Prestart(tctx.ExecCtx, task)
_, err := driver.Prestart(tctx.ExecCtx, task)
if err != nil {
tctx.AllocDir.Destroy()
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(tctx.ExecCtx, task)
if err != nil {
driver.Cleanup(tctx.ExecCtx, res)
tctx.AllocDir.Destroy()
t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack())
}
if handle == nil {
driver.Cleanup(tctx.ExecCtx, res)
tctx.AllocDir.Destroy()
t.Fatalf("handle is nil\nStack\n%s", debug.Stack())
}
cleanup := func() {
driver.Cleanup(tctx.ExecCtx, res)
handle.Kill()
tctx.AllocDir.Destroy()
}
@ -186,19 +183,16 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
res, err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer d.Cleanup(ctx.ExecCtx, res)
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
d.Cleanup(ctx.ExecCtx, res)
t.Fatalf("err: %v", err)
}
if handle == nil {
d.Cleanup(ctx.ExecCtx, res)
t.Fatalf("missing handle")
}
defer handle.Kill()
@ -285,11 +279,10 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) {
// Copy the image into the task's directory
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
res, err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer d.Cleanup(ctx.ExecCtx, res)
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -351,9 +344,8 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) {
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
res, err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err == nil {
d.Cleanup(ctx.ExecCtx, res)
t.Fatalf("want error in prestart: %v", err)
}
@ -403,11 +395,10 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
res, err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer d.Cleanup(ctx.ExecCtx, res)
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -498,11 +489,10 @@ func TestDockerDriver_StartN(t *testing.T) {
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
res, err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
defer d.Cleanup(ctx.ExecCtx, res)
handles[idx], err = d.Start(ctx.ExecCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
@ -559,11 +549,10 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
copyImage(t, ctx.ExecCtx.TaskDir, "busybox_musl.tar")
copyImage(t, ctx.ExecCtx.TaskDir, "busybox_glibc.tar")
res, err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart #%d: %v", idx+1, err)
}
defer d.Cleanup(ctx.ExecCtx, res)
handles[idx], err = d.Start(ctx.ExecCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
@ -932,11 +921,10 @@ func TestDockerDriver_User(t *testing.T) {
defer ctx.AllocDir.Destroy()
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
res, err := driver.Prestart(ctx.ExecCtx, task)
_, err := driver.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer driver.Cleanup(ctx.ExecCtx, res)
// It should fail because the user "alice" does not exist on the given
// image.
@ -1090,11 +1078,10 @@ done
fmt.Errorf("Failed to write data")
}
res, err := d.Prestart(ctx.ExecCtx, task)
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer d.Cleanup(ctx.ExecCtx, res)
handle, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -1214,11 +1201,10 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, _, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
res, err := driver.Prestart(execCtx, task)
_, err = driver.Prestart(execCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer driver.Cleanup(execCtx, res)
if _, err := driver.Start(execCtx, task); err == nil {
t.Fatalf("Started driver successfully when volumes should have been disabled.")
}
@ -1229,11 +1215,10 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) {
task, driver, execCtx, fn, cleanup := setupDockerVolumes(t, cfg, ".")
defer cleanup()
res, err := driver.Prestart(execCtx, task)
_, err := driver.Prestart(execCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer driver.Cleanup(execCtx, res)
handle, err := driver.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -1267,11 +1252,10 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) {
task, driver, execCtx, hostpath, cleanup := setupDockerVolumes(t, cfg, tmpvol)
defer cleanup()
res, err := driver.Prestart(execCtx, task)
_, err = driver.Prestart(execCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
defer driver.Cleanup(execCtx, res)
handle, err := driver.Start(execCtx, task)
if err != nil {
t.Fatalf("Failed to start docker driver: %v", err)
@ -1320,12 +1304,20 @@ func TestDockerDriver_Cleanup(t *testing.T) {
}
// Cleanup
driver.Cleanup(tctx.ExecCtx, res)
if err := driver.Cleanup(tctx.ExecCtx, dockerImageResKey, res.Resources[dockerImageResKey][0]); err != nil {
t.Fatalf("Cleanup failed: %v", err)
}
// Ensure image was removed
if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil {
t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName)
}
// The image doesn't exist which shouldn't be an error when calling
// Cleanup, so call it again to make sure.
if err := driver.Cleanup(tctx.ExecCtx, dockerImageResKey, res.Resources[dockerImageResKey][0]); err != nil {
t.Fatalf("Cleanup failed: %v", err)
}
}
func copyImage(t *testing.T, taskDir *allocdir.TaskDir, image string) {

View file

@ -137,8 +137,9 @@ type Driver interface {
// Cleanup is called to remove resources which were created for a task
// and no longer needed.
//
// Cleanup should log any errors it encounters.
Cleanup(*ExecContext, *CreatedResources)
// Cleanup is called once for every value for every key in
// CreatedResources. Errors will cause Cleanup to be retried.
Cleanup(ctx *ExecContext, key, value string) error
// Drivers must validate their configuration
Validate(map[string]interface{}) error

View file

@ -173,7 +173,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return h, nil
}
func (d *ExecDriver) Cleanup(*ExecContext, *CreatedResources) {}
func (d *ExecDriver) Cleanup(*ExecContext, string, string) error { return nil }
type execId struct {
Version string

View file

@ -260,7 +260,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return h, nil
}
func (d *JavaDriver) Cleanup(*ExecContext, *CreatedResources) {}
func (d *JavaDriver) Cleanup(*ExecContext, string, string) error { return nil }
// cgroupsMounted returns true if the cgroups are mounted on a system otherwise
// returns false

View file

@ -286,7 +286,7 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return &handle, nil
}
func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) {}
func (d *LxcDriver) Cleanup(*ExecContext, string, string) error { return nil }
// Open creates the driver to monitor an existing LXC container
func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {

View file

@ -8,6 +8,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/mitchellh/mapstructure"
@ -62,6 +63,8 @@ type MockDriverConfig struct {
type MockDriver struct {
DriverContext
fingerprint.StaticFingerprinter
cleanupFailNum int
}
// NewMockDriver is a factory method which returns a new Mock Driver
@ -124,7 +127,14 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return &h, nil
}
func (m *MockDriver) Cleanup(*ExecContext, *CreatedResources) {}
func (m *MockDriver) Cleanup(ctx *ExecContext, k, v string) error {
n, _ := strconv.Atoi(m.config.Options["cleanup_fail_num"])
if k == m.config.Options["cleanup_fail_on"] && m.cleanupFailNum < n {
m.cleanupFailNum++
return fmt.Errorf("mock_driver failure on %q call %d/%d", k, m.cleanupFailNum, n)
}
return nil
}
// Validate validates the mock driver configuration
func (m *MockDriver) Validate(map[string]interface{}) error {

View file

@ -341,7 +341,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
return h, nil
}
func (d *QemuDriver) Cleanup(*ExecContext, *CreatedResources) {}
func (d *QemuDriver) Cleanup(*ExecContext, string, string) error { return nil }
func (h *qemuHandle) ID() string {
id := qemuId{

View file

@ -182,7 +182,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return h, nil
}
func (d *RawExecDriver) Cleanup(*ExecContext, *CreatedResources) {}
func (d *RawExecDriver) Cleanup(*ExecContext, string, string) error { return nil }
type rawExecId struct {
Version string

View file

@ -456,7 +456,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return h, nil
}
func (d *RktDriver) Cleanup(*ExecContext, *CreatedResources) {}
func (d *RktDriver) Cleanup(*ExecContext, string, string) error { return nil }
func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
// Parse the handle

View file

@ -883,7 +883,6 @@ func (r *TaskRunner) run() {
select {
case success := <-prestartResultCh:
if !success {
//FIXME is this necessary?
r.cleanup()
r.setState(structs.TaskStateDead, nil)
return
@ -1021,12 +1020,45 @@ func (r *TaskRunner) run() {
// cleanup calls Driver.Cleanup when a task is stopping. Errors are logged.
func (r *TaskRunner) cleanup() {
if drv, err := r.createDriver(); err != nil {
r.logger.Printf("[WARN] client: error creating driver to cleanup resources: %v", err)
} else {
ctx := driver.NewExecContext(r.taskDir, r.alloc.ID)
drv.Cleanup(ctx, r.createdResources)
drv, err := r.createDriver()
if err != nil {
r.logger.Printf("[ERR] client: error creating driver to cleanup resources: %v", err)
return
}
ctx := driver.NewExecContext(r.taskDir, r.alloc.ID)
attempts := 1
for ; len(r.createdResources.Resources) > 0; attempts++ {
for k, items := range r.createdResources.Resources {
var retry []string
for _, v := range items {
if err := drv.Cleanup(ctx, k, v); err != nil {
r.logger.Printf("[WARN] client: error cleaning up resource %s:%s for task %q (attempt %d): %v", k, v, r.task.Name, attempts, err)
retry = append(retry, v)
continue
}
}
if len(retry) > 0 {
// At least one cleanup failed; keep it alive for retrying
r.createdResources.Resources[k] = retry
} else {
// No failures, remove resource
delete(r.createdResources.Resources, k)
}
}
// Retry 3 times with sleeps between
if attempts > 3 {
break
}
time.Sleep(time.Duration(attempts) * time.Second)
}
if len(r.createdResources.Resources) > 0 {
r.logger.Printf("[ERR] client: error cleaning up resources for task %q after %d attempts", r.task.Name, attempts)
}
return
}
// shouldRestart returns if the task should restart. If the return value is

View file

@ -1305,3 +1305,53 @@ func TestTaskRunner_SimpleRun_Dispatch(t *testing.T) {
t.Fatalf("Bad; got %v; want %v", string(data), string(expected))
}
}
func TestTaskRunner_CleanupOK(t *testing.T) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "1",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x", "y"}
ctx.tr.createdResources.Resources["foo"] = []string{"z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we only failed once, createdResources should be empty
if len(ctx.tr.createdResources.Resources) > 0 {
t.Fatalf("expected all created resources to be removed: %#v", ctx.tr.createdResources.Resources)
}
}
func TestTaskRunner_CleanupFail(t *testing.T) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "5",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x"}
ctx.tr.createdResources.Resources["foo"] = []string{"y", "z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we failed > 3 times, the failed key should remain
expected := map[string][]string{key: {"x"}}
if !reflect.DeepEqual(expected, ctx.tr.createdResources.Resources) {
t.Fatalf("expected %#v but found: %#v", expected, ctx.tr.createdResources.Resources)
}
}