From d590123637bf3b87e71c6f8ffd92c9184a4b7805 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Mon, 26 Jun 2023 15:21:42 -0500 Subject: [PATCH] drivers/docker: refactor use of clients in docker driver (#17731) * drivers/docker: refactor use of clients in docker driver This PR refactors how we manage the two underlying clients used by the docker driver for communicating with the docker daemon. We keep two clients - one with a hard-coded timeout that applies to all operations no matter what, intended for use with short lived / async calls to docker. The other has no timeout and is the responsibility of the caller to set a context that will ensure the call eventually terminates. The use of these two clients has been confusing and mistakes were made in a number of places where calls were making use of the wrong client. This PR makes it so that a user must explicitly call a function to get the client that makes sense for that use case. Fixes #17023 * cr: followup items --- .changelog/17731.txt | 3 + drivers/docker/config.go | 2 +- drivers/docker/driver.go | 181 ++++++++++++++++----------- drivers/docker/driver_unix_test.go | 17 +-- drivers/docker/fingerprint.go | 10 +- drivers/docker/handle.go | 46 ++++--- drivers/docker/network.go | 21 ++-- drivers/docker/reconcile_dangling.go | 30 +++-- drivers/docker/stats.go | 2 +- 9 files changed, 188 insertions(+), 124 deletions(-) create mode 100644 .changelog/17731.txt diff --git a/.changelog/17731.txt b/.changelog/17731.txt new file mode 100644 index 000000000..00da2bf73 --- /dev/null +++ b/.changelog/17731.txt @@ -0,0 +1,3 @@ +```release-note:bug +drivers/docker: Fixed a bug where long-running docker operations would incorrectly timeout +``` diff --git a/drivers/docker/config.go b/drivers/docker/config.go index b4fa249e3..7267d9a4b 100644 --- a/drivers/docker/config.go +++ b/drivers/docker/config.go @@ -765,7 +765,7 @@ func (d *Driver) SetConfig(c *base.Config) error { d.clientConfig = c.AgentConfig.Driver } - dockerClient, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return fmt.Errorf("failed to get docker client: %v", err) } diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index 0c3227de1..fb71f5b1e 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/nomad/drivers/shared/eventer" "github.com/hashicorp/nomad/drivers/shared/hostnames" "github.com/hashicorp/nomad/drivers/shared/resolvconf" + "github.com/hashicorp/nomad/helper" nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" @@ -39,19 +40,6 @@ import ( ) var ( - // createClientsLock is a lock that protects reading/writing global client - // variables - createClientsLock sync.Mutex - - // client is a docker client with a timeout of 5 minutes. This is for doing - // all operations with the docker daemon besides which are not long running - // such as creating, killing containers, etc. - client *docker.Client - - // waitClient is a docker client with no timeouts. This is used for long - // running operations such as waiting on containers and collect stats - waitClient *docker.Client - dockerTransientErrs = []string{ "Client.Timeout exceeded while awaiting headers", "EOF", @@ -159,6 +147,10 @@ type Driver struct { detected bool detectedLock sync.RWMutex + dockerClientLock sync.Mutex + dockerClient *docker.Client // for most docker api calls (use getDockerClient()) + infinityClient *docker.Client // for wait and stop calls (use getInfinityClient()) + danglingReconciler *containerReconciler cpusetFixer CpusetFixer } @@ -229,12 +221,17 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("failed to decode driver task state: %v", err) } - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { - return fmt.Errorf("failed to get docker client: %v", err) + return fmt.Errorf("failed to get docker client: %w", err) } - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + infinityClient, err := d.getInfinityClient() + if err != nil { + return fmt.Errorf("failed to get docker long operations client: %w", err) + } + + container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: handleState.ContainerID, }) if err != nil { @@ -242,8 +239,8 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { } h := &taskHandle{ - client: client, - waitClient: waitClient, + dockerClient: dockerClient, + infinityClient: infinityClient, logger: d.logger.With("container_id", container.ID), task: handle.Config, containerID: container.ID, @@ -261,14 +258,14 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now()) if err != nil { - if err := client.StopContainer(handleState.ContainerID, 0); err != nil { + if err := dockerClient.StopContainer(handleState.ContainerID, 0); err != nil { d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) } return fmt.Errorf("failed to setup replacement docker logger: %v", err) } if err := handle.SetDriverState(h.buildState()); err != nil { - if err := client.StopContainer(handleState.ContainerID, 0); err != nil { + if err := dockerClient.StopContainer(handleState.ContainerID, 0); err != nil { d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) } return fmt.Errorf("failed to store driver state: %v", err) @@ -315,13 +312,19 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg - // Initialize docker API clients - client, _, err := d.dockerClients() + // we'll need the normal docker client + dockerClient, err := d.getDockerClient() if err != nil { - return nil, nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) + return nil, nil, fmt.Errorf("Failed to create docker client: %v", err) } - id, err := d.createImage(cfg, &driverConfig, client) + // and also the long operations client + infinityClient, err := d.getInfinityClient() + if err != nil { + return nil, nil, fmt.Errorf("Failed to create long operations docker client: %v", err) + } + + id, err := d.createImage(cfg, &driverConfig, dockerClient) if err != nil { return nil, nil, err } @@ -342,10 +345,10 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive startAttempts := 0 CREATE: - container, err := d.createContainer(client, containerCfg, driverConfig.Image) + container, err := d.createContainer(dockerClient, containerCfg, driverConfig.Image) if err != nil { d.logger.Error("failed to create container", "error", err) - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: containerCfg.Name, Force: true, }) @@ -361,7 +364,7 @@ CREATE: // Start the container if err := d.startContainer(container); err != nil { d.logger.Error("failed to start container", "container_id", container.ID, "error", err) - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: container.ID, Force: true, }) @@ -376,17 +379,17 @@ CREATE: // Inspect container to get all of the container metadata as much of the // metadata (eg networking) isn't populated until the container is started - runningContainer, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + runningContainer, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: container.ID, }) if err != nil { - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: container.ID, Force: true, }) msg := "failed to inspect started container" d.logger.Error(msg, "error", err) - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: container.ID, Force: true, }) @@ -419,7 +422,7 @@ CREATE: dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0)) if err != nil { d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID) - client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) return nil, nil, err } } @@ -435,8 +438,8 @@ CREATE: // Return a driver handle h := &taskHandle{ - client: client, - waitClient: waitClient, + dockerClient: dockerClient, + infinityClient: infinityClient, dlogger: dlogger, dloggerPluginClient: pluginClient, logger: d.logger.With("container_id", container.ID), @@ -455,7 +458,7 @@ CREATE: dlogger.Stop() pluginClient.Kill() } - client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) return nil, nil, err } @@ -548,10 +551,15 @@ CREATE: // startContainer starts the passed container. It attempts to handle any // transient Docker errors. func (d *Driver) startContainer(c *docker.Container) error { + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + // Start a container attempted := 0 START: - startErr := client.StartContainer(c.ID, c.HostConfig) + startErr := dockerClient.StartContainer(c.ID, c.HostConfig) if startErr == nil || strings.Contains(startErr.Error(), "Container already running") { return nil } @@ -691,7 +699,12 @@ func (d *Driver) loadImage(task *drivers.TaskConfig, driverConfig *TaskConfig, c } func (d *Driver) convertAllocPathsForWindowsLCOW(task *drivers.TaskConfig, image string) error { - imageConfig, err := client.InspectImage(image) + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + + imageConfig, err := dockerClient.InspectImage(image) if err != nil { return fmt.Errorf("the image does not exist: %v", err) } @@ -763,7 +776,7 @@ func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConf func (d *Driver) findPauseContainer(allocID string) (string, error) { - _, dockerClient, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return "", err } @@ -799,7 +812,7 @@ func (d *Driver) findPauseContainer(allocID string) (string, error) { // tracking. Basically just scan all containers and pull the ID from anything // that has the Nomad Label and has Name with prefix "/nomad_init_". func (d *Driver) recoverPauseContainers(ctx context.Context) { - _, dockerClient, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { d.logger.Error("failed to recover pause containers", "error", err) return @@ -1459,11 +1472,11 @@ func (d *Driver) detectIP(c *docker.Container, driverConfig *TaskConfig) (string // if the container is dead or can't be found. func (d *Driver) containerByName(name string) (*docker.Container, error) { - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return nil, err } - containers, err := client.ListContainers(docker.ListContainersOptions{ + containers, err := dockerClient.ListContainers(docker.ListContainersOptions{ All: true, }) if err != nil { @@ -1495,7 +1508,7 @@ OUTER: return nil, nil } - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: shimContainer.ID, }) if err != nil { @@ -1563,7 +1576,12 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { return drivers.ErrTaskNotFound } - c, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + + c, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: h.containerID, }) if err != nil { @@ -1579,13 +1597,13 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { if !force { return fmt.Errorf("must call StopTask for the given task before Destroy or set force to true") } - if err := h.client.StopContainer(h.containerID, 0); err != nil { + if err := dockerClient.StopContainer(h.containerID, 0); err != nil { h.logger.Warn("failed to stop container during destroy", "error", err) } } if h.removeContainerOnExit { - if err := h.client.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil { + if err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil { h.logger.Error("error removing container", "error", err) } } else { @@ -1621,7 +1639,12 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { return nil, drivers.ErrTaskNotFound } - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + dockerClient, err := d.getDockerClient() + if err != nil { + return nil, err + } + + container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: h.containerID, }) if err != nil { @@ -1723,7 +1746,13 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri Container: h.containerID, Context: ctx, } - exec, err := h.client.CreateExec(createExecOpts) + + dockerClient, err := d.getDockerClient() + if err != nil { + return nil, err + } + + exec, err := dockerClient.CreateExec(createExecOpts) if err != nil { return nil, fmt.Errorf("failed to create exec object: %v", err) } @@ -1739,7 +1768,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri if !ok { return } - client.ResizeExecTTY(exec.ID, s.Height, s.Width) + dockerClient.ResizeExecTTY(exec.ID, s.Height, s.Width) } } }() @@ -1758,7 +1787,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri ErrorStream: opts.Stderr, Context: ctx, } - if err := client.StartExec(exec.ID, startOpts); err != nil { + if err := dockerClient.StartExec(exec.ID, startOpts); err != nil { return nil, fmt.Errorf("failed to start exec: %v", err) } @@ -1769,7 +1798,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri start := time.Now() var res *docker.ExecInspect for (res == nil || res.Running) && time.Since(start) <= execTerminatingTimeout { - res, err = client.InspectExec(exec.ID) + res, err = dockerClient.InspectExec(exec.ID) if err != nil { return nil, fmt.Errorf("failed to inspect exec result: %v", err) } @@ -1785,37 +1814,37 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri }, nil } -// dockerClients creates two *docker.Client, one for long running operations and -// the other for shorter operations. In test / dev mode we can use ENV vars to -// connect to the docker daemon. In production mode we will read docker.endpoint -// from the config file. -func (d *Driver) dockerClients() (*docker.Client, *docker.Client, error) { - createClientsLock.Lock() - defer createClientsLock.Unlock() +func (d *Driver) getOrCreateClient(timeout time.Duration) (*docker.Client, error) { + var ( + client *docker.Client + err error + ) - if client != nil && waitClient != nil { - return client, waitClient, nil - } - - var err error - - // Only initialize the client if it hasn't yet been done - if client == nil { - client, err = d.newDockerClient(dockerTimeout) - if err != nil { - return nil, nil, err + helper.WithLock(&d.dockerClientLock, func() { + if timeout == 0 { + if d.infinityClient == nil { + d.infinityClient, err = d.newDockerClient(0) + } + client = d.infinityClient + } else { + if d.dockerClient == nil { + d.dockerClient, err = d.newDockerClient(timeout) + } + client = d.dockerClient } - } + }) - // Only initialize the waitClient if it hasn't yet been done - if waitClient == nil { - waitClient, err = d.newDockerClient(0 * time.Minute) - if err != nil { - return nil, nil, err - } - } + return client, err +} - return client, waitClient, nil +// getInfinityClient creates a docker API client with no timeout. +func (d *Driver) getInfinityClient() (*docker.Client, error) { + return d.getOrCreateClient(0) +} + +// getDockerClient creates a docker API client with a hard-coded timeout. +func (d *Driver) getDockerClient() (*docker.Client, error) { + return d.getOrCreateClient(dockerTimeout) } // newDockerClient creates a new *docker.Client with a configurable timeout diff --git a/drivers/docker/driver_unix_test.go b/drivers/docker/driver_unix_test.go index 2cd8387eb..1056076e0 100644 --- a/drivers/docker/driver_unix_test.go +++ b/drivers/docker/driver_unix_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" tu "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -125,25 +126,25 @@ func TestDockerDriver_NetworkMode_Host(t *testing.T) { copyImage(t, task.TaskDir(), "busybox.tar") _, _, err := d.StartTask(task) - require.NoError(t, err) + must.NoError(t, err) - require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) + must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) defer d.DestroyTask(task.ID, true) dockerDriver, ok := d.Impl().(*Driver) - require.True(t, ok) + must.True(t, ok) handle, ok := dockerDriver.tasks.Get(task.ID) - require.True(t, ok) + must.True(t, ok) + + client := newTestDockerClient(t) container, err := client.InspectContainer(handle.containerID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) actual := container.HostConfig.NetworkMode - require.Equal(t, expected, actual) + must.Eq(t, expected, actual) } func TestDockerDriver_CPUCFSPeriod(t *testing.T) { diff --git a/drivers/docker/fingerprint.go b/drivers/docker/fingerprint.go index c67254707..0ee93d645 100644 --- a/drivers/docker/fingerprint.go +++ b/drivers/docker/fingerprint.go @@ -98,7 +98,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { return fp } - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { if d.fingerprintSuccessful() { d.logger.Info("failed to initialize client", "error", err) @@ -110,10 +110,10 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } - env, err := client.Version() + env, err := dockerClient.Version() if err != nil { if d.fingerprintSuccessful() { - d.logger.Debug("could not connect to docker daemon", "endpoint", client.Endpoint(), "error", err) + d.logger.Debug("could not connect to docker daemon", "endpoint", dockerClient.Endpoint(), "error", err) } d.setFingerprintFailure() @@ -143,7 +143,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { fp.Attributes["driver.docker.volumes.enabled"] = pstructs.NewBoolAttribute(true) } - if nets, err := client.ListNetworks(); err != nil { + if nets, err := dockerClient.ListNetworks(); err != nil { d.logger.Warn("error discovering bridge IP", "error", err) } else { for _, n := range nets { @@ -167,7 +167,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } - if dockerInfo, err := client.Info(); err != nil { + if dockerInfo, err := dockerClient.Info(); err != nil { d.logger.Warn("failed to get Docker system info", "error", err) } else { runtimeNames := make([]string, 0, len(dockerInfo.Runtimes)) diff --git a/drivers/docker/handle.go b/drivers/docker/handle.go index 67f737ed4..9ad0e5598 100644 --- a/drivers/docker/handle.go +++ b/drivers/docker/handle.go @@ -25,8 +25,17 @@ import ( ) type taskHandle struct { - client *docker.Client - waitClient *docker.Client + // dockerClient is useful for normal docker API calls. It should be used + // for all calls that aren't Wait() or Stop() (and their variations). + dockerClient *docker.Client + + // infinityClient is useful for + // - the Wait docker API call(s) (no limit on container lifetime) + // - the Stop docker API call(s) (context with task kill_timeout required) + // Do not use this client for any other docker API calls, instead use the + // normal dockerClient which includes a default timeout. + infinityClient *docker.Client + logger hclog.Logger dlogger docklog.DockerLogger dloggerPluginClient *plugin.Client @@ -80,7 +89,7 @@ func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*driv Container: h.containerID, Context: ctx, } - exec, err := h.client.CreateExec(createExecOpts) + exec, err := h.dockerClient.CreateExec(createExecOpts) if err != nil { return nil, err } @@ -95,12 +104,12 @@ func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*driv ErrorStream: stderr, Context: ctx, } - if err := client.StartExec(exec.ID, startOpts); err != nil { + if err := h.dockerClient.StartExec(exec.ID, startOpts); err != nil { return nil, err } execResult.Stdout = stdout.Bytes() execResult.Stderr = stderr.Bytes() - res, err := client.InspectExec(exec.ID) + res, err := h.dockerClient.InspectExec(exec.ID) if err != nil { return execResult, err } @@ -120,13 +129,14 @@ func (h *taskHandle) Signal(ctx context.Context, s os.Signal) error { // MacOS signals to the correct signal number for docker. Or we change the // interface to take a signal string and leave it up to driver to map? - dockerSignal := docker.Signal(sysSig) opts := docker.KillContainerOptions{ ID: h.containerID, - Signal: dockerSignal, + Signal: docker.Signal(sysSig), Context: ctx, } - return h.client.KillContainer(opts) + + // remember Kill just means send a signal; this is not the complex StopContainer case + return h.dockerClient.KillContainer(opts) } // parseSignal interprets the signal name into an os.Signal. If no name is @@ -159,7 +169,13 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal string) error { // Signal is used to kill the container with the desired signal before // calling StopContainer if signal == "" { - err = h.client.StopContainer(h.containerID, uint(killTimeout.Seconds())) + // give the context timeout some wiggle room beyond the kill timeout + // docker will use, so we can happy path even in the force kill case + graciousTimeout := killTimeout + dockerTimeout + ctx, cancel := context.WithTimeout(context.Background(), graciousTimeout) + defer cancel() + apiTimeout := uint(killTimeout.Seconds()) + err = h.infinityClient.StopContainerWithContext(h.containerID, apiTimeout, ctx) } else { ctx, cancel := context.WithTimeout(context.Background(), killTimeout) defer cancel() @@ -191,8 +207,8 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal string) error { case <-ctx.Done(): } - // Stop the container - err = h.client.StopContainer(h.containerID, 0) + // Stop the container forcefully. + err = h.dockerClient.StopContainer(h.containerID, 0) } if err != nil { @@ -230,7 +246,7 @@ func (h *taskHandle) shutdownLogger() { func (h *taskHandle) run() { defer h.shutdownLogger() - exitCode, werr := h.waitClient.WaitContainer(h.containerID) + exitCode, werr := h.infinityClient.WaitContainer(h.containerID) if werr != nil { h.logger.Error("failed to wait for container; already terminated") } @@ -239,7 +255,7 @@ func (h *taskHandle) run() { werr = fmt.Errorf("Docker container exited with non-zero exit code: %d", exitCode) } - container, ierr := h.waitClient.InspectContainerWithOptions(docker.InspectContainerOptions{ + container, ierr := h.dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: h.containerID, }) oom := false @@ -264,8 +280,8 @@ func (h *taskHandle) run() { close(h.doneCh) // Stop the container just incase the docker daemon's wait returned - // incorrectly - if err := h.client.StopContainer(h.containerID, 0); err != nil { + // incorrectly. + if err := h.dockerClient.StopContainer(h.containerID, 0); err != nil { _, noSuchContainer := err.(*docker.NoSuchContainer) _, containerNotRunning := err.(*docker.ContainerNotRunning) if !containerNotRunning && !noSuchContainer { diff --git a/drivers/docker/network.go b/drivers/docker/network.go index e92acd3e5..5e66ff382 100644 --- a/drivers/docker/network.go +++ b/drivers/docker/network.go @@ -26,7 +26,7 @@ const ( func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreateRequest) (*drivers.NetworkIsolationSpec, bool, error) { // Initialize docker API clients - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return nil, false, fmt.Errorf("failed to connect to docker daemon: %s", err) } @@ -68,7 +68,7 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate return specFromContainer(container, createSpec.Hostname), false, nil } - container, err = d.createContainer(client, *config, d.config.InfraImage) + container, err = d.createContainer(dockerClient, *config, d.config.InfraImage) if err != nil { return nil, false, err } @@ -79,7 +79,7 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate // until the container is started, InspectContainerWithOptions // returns a mostly-empty struct - container, err = client.InspectContainerWithOptions(docker.InspectContainerOptions{ + container, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: container.ID, }) if err != nil { @@ -122,17 +122,17 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp // let the background reconciliation keep trying d.pauseContainers.remove(id) - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return fmt.Errorf("failed to connect to docker daemon: %s", err) } timeout := uint(1) // this is the pause container, just kill it fast - if err := client.StopContainerWithContext(id, timeout, d.ctx); err != nil { + if err := dockerClient.StopContainerWithContext(id, timeout, d.ctx); err != nil { d.logger.Warn("failed to stop pause container", "id", id, "error", err) } - if err := client.RemoveContainer(docker.RemoveContainerOptions{ + if err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{ Force: true, ID: id, }); err != nil { @@ -144,7 +144,7 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp // The Docker image ID is needed in order to correctly update the image // reference count. Any error finding this, however, should not result // in an error shutting down the allocrunner. - dockerImage, err := client.InspectImage(d.config.InfraImage) + dockerImage, err := dockerClient.InspectImage(d.config.InfraImage) if err != nil { d.logger.Warn("InspectImage failed for infra_image container destroy", "image", d.config.InfraImage, "error", err) @@ -187,6 +187,11 @@ func (d *Driver) createSandboxContainerConfig(allocID string, createSpec *driver func (d *Driver) pullInfraImage(allocID string) error { repo, tag := parseDockerImage(d.config.InfraImage) + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + // There's a (narrow) time-of-check-time-of-use race here. If we call // InspectImage and then a concurrent task shutdown happens before we call // IncrementImageReference, we could end up removing the image, and it @@ -194,7 +199,7 @@ func (d *Driver) pullInfraImage(allocID string) error { d.coordinator.imageLock.Lock() if tag != "latest" { - dockerImage, err := client.InspectImage(d.config.InfraImage) + dockerImage, err := dockerClient.InspectImage(d.config.InfraImage) if err != nil { d.logger.Debug("InspectImage failed for infra_image container pull", "image", d.config.InfraImage, "error", err) diff --git a/drivers/docker/reconcile_dangling.go b/drivers/docker/reconcile_dangling.go index d75a5268e..191684fbd 100644 --- a/drivers/docker/reconcile_dangling.go +++ b/drivers/docker/reconcile_dangling.go @@ -22,10 +22,10 @@ import ( // creation API call fail with a network error. containerReconciler // scans for these untracked containers and kill them. type containerReconciler struct { - ctx context.Context - config *ContainerGCConfig - client *docker.Client - logger hclog.Logger + ctx context.Context + config *ContainerGCConfig + logger hclog.Logger + getClient func() (*docker.Client, error) isDriverHealthy func() bool trackedContainers func() *set.Set[string] @@ -36,10 +36,10 @@ type containerReconciler struct { func newReconciler(d *Driver) *containerReconciler { return &containerReconciler{ - ctx: d.ctx, - config: &d.config.GC.DanglingContainers, - client: client, - logger: d.logger, + ctx: d.ctx, + config: &d.config.GC.DanglingContainers, + getClient: d.getDockerClient, + logger: d.logger, isDriverHealthy: func() bool { return d.previouslyDetected() && d.fingerprintSuccessful() }, trackedContainers: d.trackedContainers, @@ -109,9 +109,14 @@ func (r *containerReconciler) removeDanglingContainersIteration() error { return nil } + dockerClient, err := r.getClient() + if err != nil { + return err + } + for _, id := range untracked.Slice() { ctx, cancel := r.dockerAPIQueryContext() - err := client.RemoveContainer(docker.RemoveContainerOptions{ + err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{ Context: ctx, ID: id, Force: true, @@ -135,7 +140,12 @@ func (r *containerReconciler) untrackedContainers(tracked *set.Set[string], cuto ctx, cancel := r.dockerAPIQueryContext() defer cancel() - cc, err := client.ListContainers(docker.ListContainersOptions{ + dockerClient, err := r.getClient() + if err != nil { + return nil, err + } + + cc, err := dockerClient.ListContainers(docker.ListContainersOptions{ Context: ctx, All: false, // only reconcile running containers }) diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index 3ca614fa5..23097ee99 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -131,7 +131,7 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte } // Stats blocks until an error has occurred, or doneCh has been closed - if err := h.client.Stats(statsOpts); err != nil && err != io.ErrClosedPipe { + if err := h.dockerClient.Stats(statsOpts); err != nil && err != io.ErrClosedPipe { // An error occurred during stats collection, retry with backoff h.logger.Debug("error collecting stats from container", "error", err)