Docker Image Coordinator

This PR introduces a coordinator for doing CRUD on a Docker image. It
should fix racy deletion of images. The issue before was images would be
deleted between prestart and start causing an error.
This commit is contained in:
Alex Dadgar 2017-02-24 13:20:40 -08:00
parent 2239503e6e
commit b5d4f39734
6 changed files with 665 additions and 134 deletions

View File

@ -233,6 +233,29 @@ func (c *Config) ReadBoolDefault(id string, defaultValue bool) bool {
return val
}
// ReadDuration parses the specified option as a duration.
func (c *Config) ReadDuration(id string) (time.Duration, error) {
val, ok := c.Options[id]
if !ok {
return time.Duration(0), fmt.Errorf("Specified config is missing from options")
}
dval, err := time.ParseDuration(val)
if err != nil {
return time.Duration(0), fmt.Errorf("Failed to parse %s as time duration: %s", val, err)
}
return dval, nil
}
// ReadDurationDefault tries to parse the specified option as a duration. If there is
// an error in parsing, the default option is returned.
func (c *Config) ReadDurationDefault(id string, defaultValue time.Duration) time.Duration {
val, err := c.ReadDuration(id)
if err != nil {
return defaultValue
}
return val
}
// ReadStringListToMap tries to parse the specified option as a comma separated list.
// If there is an error in parsing, an empty list is returned.
func (c *Config) ReadStringListToMap(key string) map[string]struct{} {

View File

@ -7,7 +7,6 @@ import (
"net"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
@ -58,11 +57,7 @@ var (
recoverableErrTimeouts = func(err error) error {
r := false
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") ||
strings.Contains(err.Error(), "EOF") ||
// TODO Remove when we implement global co-ordination among docker
// drivers to not remove images which are in use by instances of
// other drivers
strings.Contains(err.Error(), "no such image") {
strings.Contains(err.Error(), "EOF") {
r = true
}
return structs.NewRecoverableError(err, r)
@ -96,6 +91,11 @@ const (
dockerCleanupImageConfigOption = "docker.cleanup.image"
dockerCleanupImageConfigDefault = true
// dockerPullTimeoutConfigOption is the key for setting an images pull
// timeout
dockerImageRemoveDelayConfigOption = "docker.cleanup.image.delay"
dockerImageRemoveDelayConfigDefault = 3 * time.Minute
// dockerTimeout is the length of time a request can be outstanding before
// it is timed out.
dockerTimeout = 5 * time.Minute
@ -130,7 +130,7 @@ type DockerLoggingOpts struct {
type DockerDriverConfig struct {
ImageName string `mapstructure:"image"` // Container's Image Name
LoadImages []string `mapstructure:"load"` // LoadImage is array of paths to image archive files
LoadImage string `mapstructure:"load"` // LoadImage is a path to an image archive file
Command string `mapstructure:"command"` // The Command to run when the container starts up
Args []string `mapstructure:"args"` // The arguments to the Command
IpcMode string `mapstructure:"ipc_mode"` // The IPC mode of the container - host and none
@ -191,11 +191,11 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke
dconf.UTSMode = env.ReplaceEnv(dconf.UTSMode)
dconf.Hostname = env.ReplaceEnv(dconf.Hostname)
dconf.WorkDir = env.ReplaceEnv(dconf.WorkDir)
dconf.LoadImage = env.ReplaceEnv(dconf.LoadImage)
dconf.Volumes = env.ParseAndReplace(dconf.Volumes)
dconf.VolumeDriver = env.ReplaceEnv(dconf.VolumeDriver)
dconf.DNSServers = env.ParseAndReplace(dconf.DNSServers)
dconf.DNSSearchDomains = env.ParseAndReplace(dconf.DNSSearchDomains)
dconf.LoadImages = env.ParseAndReplace(dconf.LoadImages)
for _, m := range dconf.LabelsRaw {
for k, v := range m {
@ -241,6 +241,7 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke
type dockerPID struct {
Version string
Image string
ImageID string
ContainerID string
KillTimeout time.Duration
@ -254,6 +255,8 @@ type DockerHandle struct {
client *docker.Client
waitClient *docker.Client
logger *log.Logger
Image string
ImageID string
containerID string
version string
clkSpeed float64
@ -321,7 +324,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
Required: true,
},
"load": &fields.FieldSchema{
Type: fields.TypeArray,
Type: fields.TypeString,
},
"command": &fields.FieldSchema{
Type: fields.TypeString,
@ -416,6 +419,18 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationImage
}
// getDockerCoordinator returns the docker coordinator
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator {
config := &dockerCoordinatorConfig{
client: client,
cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault),
logger: d.logger,
removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault),
}
return GetDockerCoordinator(config)
}
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
if err != nil {
@ -432,22 +447,14 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR
}
// Ensure the image is available
if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil {
return nil, err
}
// Regardless of whether the image was downloaded already or not, store
// it as a created resource. Cleanup will soft fail if the image is
// still in use by another contianer.
dockerImage, err := client.InspectImage(driverConfig.ImageName)
id, err := d.createImage(driverConfig, client, ctx.TaskDir)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err)
return nil, err
}
res := NewCreatedResources()
res.Add(dockerImageResKey, dockerImage.ID)
d.imageID = dockerImage.ID
res.Add(dockerImageResKey, id)
d.imageID = id
return res, nil
}
@ -535,6 +542,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
executor: exec,
pluginClient: pluginClient,
logger: d.logger,
Image: d.driverConfig.ImageName,
ImageID: d.imageID,
containerID: container.ID,
version: d.config.Version,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
@ -585,20 +594,9 @@ func (d *DockerDriver) cleanupImage(id string) error {
return nil
}
if err := client.RemoveImage(id); err != nil {
if err == docker.ErrNoSuchImage {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id)
return nil
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id)
return nil
}
// Retry on unknown errors
return structs.NewRecoverableError(err, true)
}
coordinator := d.getDockerCoordinator(client)
coordinator.RemoveImage(id)
d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
return nil
}
@ -942,35 +940,21 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
}, nil
}
var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)
// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func (d *DockerDriver) recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}
func (d *DockerDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
// createImage creates a docker image either by pulling it from a registry or by
// loading it from the file system
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error {
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (string, error) {
image := driverConfig.ImageName
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}
coordinator := d.getDockerCoordinator(client)
// We're going to check whether the image is already downloaded. If the tag
// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
// bother to check and cache the id here. We'll download first, then cache.
@ -978,46 +962,38 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag)
} else if tag != "latest" {
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
// Image exists, nothing to do
return nil
// Image exists so just increment its reference count
coordinator.IncrementImageReference(dockerImage.ID, image)
return dockerImage.ID, nil
}
}
// Load the image if specified
if len(driverConfig.LoadImages) > 0 {
if driverConfig.LoadImage != "" {
return d.loadImage(driverConfig, client, taskDir)
}
// Download the image
if err := d.pullImage(driverConfig, client, repo, tag); err != nil {
return err
}
return nil
return d.pullImage(driverConfig, client, repo, tag)
}
// pullImage creates an image by pulling it from a docker registry
func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo string, tag string) error {
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
}
authOptions := docker.AuthConfiguration{}
func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo, tag string) (id string, err error) {
var authOptions *docker.AuthConfiguration
if len(driverConfig.Auth) != 0 {
authOptions = docker.AuthConfiguration{
authOptions = &docker.AuthConfiguration{
Username: driverConfig.Auth[0].Username,
Password: driverConfig.Auth[0].Password,
Email: driverConfig.Auth[0].Email,
ServerAddress: driverConfig.Auth[0].ServerAddress,
}
} else if authConfigFile := d.config.Read("docker.auth.config"); authConfigFile != "" {
authOptionsPtr, err := authOptionFrom(authConfigFile, repo)
authOptions, err := authOptionFrom(authConfigFile, repo)
if err != nil {
d.logger.Printf("[INFO] driver.docker: failed to find docker auth for repo %q: %v", repo, err)
return fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err)
return "", fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err)
}
authOptions = *authOptionsPtr
if authOptions.Email == "" && authOptions.Password == "" &&
authOptions.ServerAddress == "" && authOptions.Username == "" {
d.logger.Printf("[DEBUG] driver.docker: did not find docker auth for repo %q", repo)
@ -1025,33 +1001,35 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
}
d.emitEvent("Downloading image %s:%s", repo, tag)
err := client.PullImage(pullOptions, authOptions)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
return d.recoverablePullError(err, driverConfig.ImageName)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)
return nil
coordinator := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions)
}
// loadImage creates an image by loading it from the file system
func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error {
var errors multierror.Error
for _, image := range driverConfig.LoadImages {
archive := filepath.Join(taskDir.LocalDir, image)
d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive)
f, err := os.Open(archive)
if err != nil {
errors.Errors = append(errors.Errors, fmt.Errorf("unable to open image archive: %v", err))
continue
}
if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil {
errors.Errors = append(errors.Errors, err)
}
f.Close()
func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client,
taskDir *allocdir.TaskDir) (id string, err error) {
archive := filepath.Join(taskDir.LocalDir, driverConfig.LoadImage)
d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive)
f, err := os.Open(archive)
if err != nil {
return "", fmt.Errorf("unable to open image archive: %v", err)
}
return errors.ErrorOrNil()
if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil {
return "", err
}
f.Close()
dockerImage, err := client.InspectImage(driverConfig.ImageName)
if err != nil {
return "", recoverableErrTimeouts(err)
}
coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName)
return dockerImage.ID, nil
}
// createContainer creates the container given the passed configuration. It
@ -1202,6 +1180,11 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
ver, _ := exec.Version()
d.logger.Printf("[DEBUG] driver.docker: version of executor: %v", ver.Version)
// Increment the reference count since we successfully attached to this
// container
coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(pid.ImageID, pid.Image)
// Return a driver handle
h := &DockerHandle{
client: client,
@ -1230,6 +1213,8 @@ func (h *DockerHandle) ID() string {
pid := dockerPID{
Version: h.version,
ContainerID: h.containerID,
Image: h.Image,
ImageID: h.ImageID,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),

View File

@ -0,0 +1,288 @@
package driver
import (
"context"
"fmt"
"log"
"regexp"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// createCoordinator allows us to only create a single coordinator
createCoordinator sync.Once
// globalCoordinator is the shared coordinator and should only be retreived
// using the GetDockerCoordinator() method.
globalCoordinator *dockerCoordinator
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)
// pullFuture is a sharable future for retrieving a pulled images ID and any
// error that may have occured during the pull.
type pullFuture struct {
waitCh chan struct{}
err error
imageID string
}
// newPullFuture returns a new pull future
func newPullFuture() *pullFuture {
return &pullFuture{
waitCh: make(chan struct{}),
}
}
// wait waits till the future has a result
func (p *pullFuture) wait() *pullFuture {
<-p.waitCh
return p
}
// result returns the results of the future and should only ever be called after
// wait returns.
func (p *pullFuture) result() (imageID string, err error) {
return p.imageID, p.err
}
// set is used to set the results and unblock any waiter. This may only be
// called once.
func (p *pullFuture) set(imageID string, err error) {
p.imageID = imageID
p.err = err
close(p.waitCh)
}
// DockerImageClient provides the methods required to do CRUD operations on the
// Docker images
type DockerImageClient interface {
PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error
InspectImage(id string) (*docker.Image, error)
RemoveImage(id string) error
}
// dockerCoordinatorConfig is used to configure the Docker coordinator.
type dockerCoordinatorConfig struct {
// logger is the logger the coordinator should use
logger *log.Logger
// cleanup marks whether images should be deleting when the reference count
// is zero
cleanup bool
// client is the Docker client to use for communicating with Docker
client DockerImageClient
// removeDelay is the delay between an image's reference count going to
// zero and the image actually being deleted.
removeDelay time.Duration
}
// dockerCoordinator is used to coordinate actions against images to prevent
// racy deletions. It can be thought of as a reference counter on images.
type dockerCoordinator struct {
*dockerCoordinatorConfig
// imageLock is used to lock access to all images
imageLock sync.Mutex
// pullFutures is used to allow multiple callers to pull the same image but
// only have one request be sent to Docker
pullFutures map[string]*pullFuture
// imageRefCount is the reference count of image IDs
imageRefCount map[string]int
// deleteFuture is indexed by image ID and has a cancable delete future
deleteFuture map[string]context.CancelFunc
}
// NewDockerCoordinator returns a new Docker coordinator
func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
if config.client == nil {
return nil
}
return &dockerCoordinator{
dockerCoordinatorConfig: config,
pullFutures: make(map[string]*pullFuture),
imageRefCount: make(map[string]int),
deleteFuture: make(map[string]context.CancelFunc),
}
}
// GetDockerCoordinator returns the shared dockerCoordinator instance
func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
createCoordinator.Do(func() {
globalCoordinator = NewDockerCoordinator(config)
})
return globalCoordinator
}
// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occured during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration) (imageID string, err error) {
// Lock while we look up the future
d.imageLock.Lock()
// Get the future
future, ok := d.pullFutures[image]
if !ok {
// Make the future
future = newPullFuture()
d.pullFutures[image] = future
go d.pullImageImpl(image, authOptions, future)
}
d.imageLock.Unlock()
// We unlock while we wait since this can take a while
id, err := future.wait().result()
// If we are cleaning up, we increment the reference count on the image
if err == nil && d.cleanup {
d.IncrementImageReference(id, image)
}
return id, err
}
// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) {
// Parse the repo and tag
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
}
// Attempt to pull the image
var auth docker.AuthConfiguration
if authOptions != nil {
auth = *authOptions
}
err := d.client.PullImage(pullOptions, auth)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
future.set("", recoverablePullError(err, image))
return
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)
dockerImage, err := d.client.InspectImage(image)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", image, err)
future.set("", recoverableErrTimeouts(err))
return
}
future.set(dockerImage.ID, nil)
return
}
// IncrementImageReference is used to increment an image reference count
func (d *dockerCoordinator) IncrementImageReference(id, image string) {
d.imageLock.Lock()
d.imageRefCount[id] += 1
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id])
// Cancel any pending delete
if cancel, ok := d.deleteFuture[id]; ok {
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", image)
cancel()
}
d.imageLock.Unlock()
}
// RemoveImage removes the given image. If there are any errors removing the
// image, the remove is retried internally.
func (d *dockerCoordinator) RemoveImage(id string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()
references, ok := d.imageRefCount[id]
if !ok {
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", id)
return
}
// Decrement the reference count
references--
d.imageRefCount[id] = references
d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", id, references)
// Nothing to do
if references != 0 {
return
}
// Setup a future to delete the image
ctx, cancel := context.WithCancel(context.Background())
d.deleteFuture[id] = cancel
go d.removeImageImpl(id, ctx)
// Delete the key from the reference count
delete(d.imageRefCount, id)
}
// removeImageImpl is used to remove an image. It wil wait the specified remove
// delay to remove the image. If the context is cancalled before that the image
// removal will be cancelled.
func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
// Sanity check
if !d.cleanup {
return
}
// Wait for the delay or a cancellation event
select {
case <-ctx.Done():
// We have been cancelled
return
case <-time.After(d.removeDelay):
}
for i := 0; i < 3; i++ {
err := d.client.RemoveImage(id)
if err == nil {
break
}
if err == docker.ErrNoSuchImage {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id)
return
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id)
return
}
// Retry on unknown errors
d.logger.Printf("[DEBUG] driver.docker: failed to remove image %q (attempt %d): %v", id, i+1, err)
}
d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
}
// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

View File

@ -0,0 +1,211 @@
package driver
import (
"fmt"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
type mockImageClient struct {
pulled map[string]int
idToName map[string]string
removed map[string]int
pullDelay time.Duration
}
func newMockImageClient(idToName map[string]string, pullDelay time.Duration) *mockImageClient {
return &mockImageClient{
pulled: make(map[string]int),
removed: make(map[string]int),
idToName: idToName,
pullDelay: pullDelay,
}
}
func (m *mockImageClient) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error {
time.Sleep(m.pullDelay)
m.pulled[opts.Repository]++
return nil
}
func (m *mockImageClient) InspectImage(id string) (*docker.Image, error) {
return &docker.Image{
ID: m.idToName[id],
}, nil
}
func (m *mockImageClient) RemoveImage(id string) error {
m.removed[id]++
return nil
}
func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
image := "foo"
imageID := structs.GenerateUUID()
mapping := map[string]string{imageID: image}
// Add a delay so we can get multiple queued up
mock := newMockImageClient(mapping, 10*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testLogger(),
cleanup: true,
client: mock,
removeDelay: 100 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
id := ""
for i := 0; i < 10; i++ {
id, _ = coordinator.PullImage(image, nil)
}
if p := mock.pulled[image]; p != 1 {
t.Fatalf("Got multiple pulls %d", p)
}
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
t.Fatalf("Got reference count %d; want %d", r, 10)
}
}
func TestDockerCoordinator_Pull_Remove(t *testing.T) {
image := "foo"
imageID := structs.GenerateUUID()
mapping := map[string]string{imageID: image}
// Add a delay so we can get multiple queued up
mock := newMockImageClient(mapping, 10*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testLogger(),
cleanup: true,
client: mock,
removeDelay: 1 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
id := ""
for i := 0; i < 10; i++ {
id, _ = coordinator.PullImage(image, nil)
}
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
t.Fatalf("Got reference count %d; want %d", r, 10)
}
// Remove some
for i := 0; i < 8; i++ {
coordinator.RemoveImage(id)
}
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 2 {
t.Fatalf("Got reference count %d; want %d", r, 2)
}
// Remove all
for i := 0; i < 2; i++ {
coordinator.RemoveImage(id)
}
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 0)
}
// Check that only one delete happened
testutil.WaitForResult(func() (bool, error) {
removes := mock.removed[id]
return removes == 1, fmt.Errorf("Wrong number of removes: %d", removes)
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
image := "foo"
imageID := structs.GenerateUUID()
mapping := map[string]string{imageID: image}
mock := newMockImageClient(mapping, 1*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testLogger(),
cleanup: true,
client: mock,
removeDelay: 1 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
// Pull image
id, _ := coordinator.PullImage(image, nil)
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 1 {
t.Fatalf("Got reference count %d; want %d", r, 10)
}
// Remove image
coordinator.RemoveImage(id)
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 0)
}
// Pull image again within delay
id, _ = coordinator.PullImage(image, nil)
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 1 {
t.Fatalf("Got reference count %d; want %d", r, 0)
}
// Check that only no delete happened
if removes := mock.removed[id]; removes != 0 {
t.Fatalf("Image deleted when it shouldn't have")
}
}
func TestDockerCoordinator_No_Cleanup(t *testing.T) {
image := "foo"
imageID := structs.GenerateUUID()
mapping := map[string]string{imageID: image}
mock := newMockImageClient(mapping, 1*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testLogger(),
cleanup: false,
client: mock,
removeDelay: 1 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
// Pull image
id, _ := coordinator.PullImage(image, nil)
// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 10)
}
// Remove image
coordinator.RemoveImage(id)
// Check that only no delete happened
if removes := mock.removed[id]; removes != 0 {
t.Fatalf("Image deleted when it shouldn't have")
}
}

View File

@ -55,7 +55,7 @@ func dockerTask() (*structs.Task, int, int) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/nc",
"args": []string{"-l", "127.0.0.1", "-p", "0"},
},
@ -93,28 +93,41 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle
return dockerSetupWithClient(t, task, client)
}
func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Client) (*docker.Client, DriverHandle, func()) {
func testDockerDriverContexts(t *testing.T, task *structs.Task) *testContext {
tctx := testDriverContexts(t, task)
tctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
// Drop the delay
tctx.DriverCtx.config.Options = make(map[string]string)
tctx.DriverCtx.config.Options[dockerImageRemoveDelayConfigOption] = "1s"
return tctx
}
func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Client) (*docker.Client, DriverHandle, func()) {
tctx := testDockerDriverContexts(t, task)
//tctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
driver := NewDockerDriver(tctx.DriverCtx)
copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar")
_, err := driver.Prestart(tctx.ExecCtx, task)
res, err := driver.Prestart(tctx.ExecCtx, task)
if err != nil {
tctx.AllocDir.Destroy()
t.Fatalf("error in prestart: %v", err)
}
handle, err := driver.Start(tctx.ExecCtx, task)
if err != nil {
tctx.AllocDir.Destroy()
t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack())
}
if handle == nil {
tctx.AllocDir.Destroy()
t.Fatalf("handle is nil\nStack\n%s", debug.Stack())
}
cleanup := func() {
driver.Cleanup(tctx.ExecCtx, res)
handle.Kill()
tctx.AllocDir.Destroy()
}
@ -136,8 +149,8 @@ func newTestDockerClient(t *testing.T) *docker.Client {
// This test should always pass, even if docker daemon is not available
func TestDockerDriver_Fingerprint(t *testing.T) {
ctx := testDriverContexts(t, &structs.Task{Name: "foo", Driver: "docker", Resources: basicResources})
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, &structs.Task{Name: "foo", Driver: "docker", Resources: basicResources})
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
node := &structs.Node{
@ -165,7 +178,7 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
Name: "nc-demo",
Driver: "docker",
Config: map[string]interface{}{
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"image": "busybox",
"command": "/bin/nc",
"args": []string{"-l", "127.0.0.1", "-p", "0"},
@ -177,8 +190,8 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
Resources: basicResources,
}
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
@ -212,7 +225,7 @@ func TestDockerDriver_Start_Wait(t *testing.T) {
Name: "nc-demo",
Driver: "docker",
Config: map[string]interface{}{
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"image": "busybox",
"command": "/bin/echo",
"args": []string{"hello"},
@ -255,7 +268,7 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/echo",
"args": []string{
"hello",
@ -271,8 +284,8 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) {
},
}
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
@ -339,8 +352,8 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) {
},
}
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
@ -371,7 +384,7 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/sh",
"args": []string{
"-c",
@ -389,8 +402,8 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
},
}
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
@ -435,7 +448,7 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/sleep",
"args": []string{"10"},
},
@ -483,8 +496,8 @@ func TestDockerDriver_StartN(t *testing.T) {
// Let's spin up a bunch of things
for idx, task := range taskList {
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
@ -523,15 +536,15 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
task1, _, _ := dockerTask()
task1.Config["image"] = "busybox"
task1.Config["load"] = []string{"busybox.tar"}
task1.Config["load"] = "busybox.tar"
task2, _, _ := dockerTask()
task2.Config["image"] = "busybox:musl"
task2.Config["load"] = []string{"busybox_musl.tar"}
task2.Config["load"] = "busybox_musl.tar"
task3, _, _ := dockerTask()
task3.Config["image"] = "busybox:glibc"
task3.Config["load"] = []string{"busybox_glibc.tar"}
task3.Config["load"] = "busybox_glibc.tar"
taskList := []*structs.Task{task1, task2, task3}
@ -541,8 +554,8 @@ func TestDockerDriver_StartNVersions(t *testing.T) {
// Let's spin up a bunch of things
for idx, task := range taskList {
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
@ -599,7 +612,7 @@ func TestDockerDriver_NetworkMode_Host(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/nc",
"args": []string{"-l", "127.0.0.1", "-p", "0"},
"network_mode": expected,
@ -649,7 +662,7 @@ func TestDockerDriver_NetworkAliases_Bridge(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/nc",
"args": []string{"-l", "127.0.0.1", "-p", "0"},
"network_mode": network.Name,
@ -708,9 +721,9 @@ func TestDockerDriver_ForcePull_IsInvalidConfig(t *testing.T) {
task, _, _ := dockerTask()
task.Config["force_pull"] = "nothing"
ctx := testDriverContexts(t, task)
ctx := testDockerDriverContexts(t, task)
defer ctx.AllocDir.Destroy()
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
driver := NewDockerDriver(ctx.DriverCtx)
if _, err := driver.Prestart(ctx.ExecCtx, task); err == nil {
@ -897,7 +910,7 @@ func TestDockerDriver_User(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/sleep",
"args": []string{"10000"},
},
@ -915,8 +928,8 @@ func TestDockerDriver_User(t *testing.T) {
t.SkipNow()
}
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
driver := NewDockerDriver(ctx.DriverCtx)
defer ctx.AllocDir.Destroy()
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
@ -945,7 +958,7 @@ func TestDockerDriver_CleanupContainer(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/echo",
"args": []string{"hello"},
},
@ -993,7 +1006,7 @@ func TestDockerDriver_Stats(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/sleep",
"args": []string{"100"},
},
@ -1045,7 +1058,7 @@ func TestDockerDriver_Signal(t *testing.T) {
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "/bin/sh",
"args": []string{"local/test.sh"},
},
@ -1059,8 +1072,8 @@ func TestDockerDriver_Signal(t *testing.T) {
},
}
ctx := testDriverContexts(t, task)
ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
ctx := testDockerDriverContexts(t, task)
//ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"}
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
@ -1140,7 +1153,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": []string{"busybox.tar"},
"load": "busybox.tar",
"command": "touch",
"args": []string{containerFile},
"volumes": []string{fmt.Sprintf("%s:${VOL_PATH}", hostpath)},
@ -1294,7 +1307,7 @@ func TestDockerDriver_Cleanup(t *testing.T) {
"image": imageName,
},
}
tctx := testDriverContexts(t, task)
tctx := testDockerDriverContexts(t, task)
defer tctx.AllocDir.Destroy()
// Run Prestart
@ -1319,9 +1332,15 @@ func TestDockerDriver_Cleanup(t *testing.T) {
}
// Ensure image was removed
if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil {
t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName)
}
tu.WaitForResult(func() (bool, error) {
if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil {
return false, fmt.Errorf("image exists but should have been removed. Does another %v container exist?", imageName)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// The image doesn't exist which shouldn't be an error when calling
// Cleanup, so call it again to make sure.

View File

@ -409,6 +409,11 @@ options](/docs/agent/configuration/client.html#options):
* `docker.cleanup.image` Defaults to `true`. Changing this to `false` will
prevent Nomad from removing images from stopped tasks.
* `docker.cleanup.image.delay` A time duration that defaults to `3m`. The delay
controls how long Nomad will wait between an image being unused and deleting
it. If a tasks is received that uses the same image within the delay, the
image will be reused.
* `docker.volumes.enabled`: Defaults to `true`. Allows tasks to bind host paths
(`volumes`) inside their container. Binding relative paths is always allowed
and will be resolved relative to the allocation's directory.