diff --git a/.changelog/17731.txt b/.changelog/17731.txt new file mode 100644 index 000000000..00da2bf73 --- /dev/null +++ b/.changelog/17731.txt @@ -0,0 +1,3 @@ +```release-note:bug +drivers/docker: Fixed a bug where long-running docker operations would incorrectly timeout +``` diff --git a/drivers/docker/config.go b/drivers/docker/config.go index b4fa249e3..7267d9a4b 100644 --- a/drivers/docker/config.go +++ b/drivers/docker/config.go @@ -765,7 +765,7 @@ func (d *Driver) SetConfig(c *base.Config) error { d.clientConfig = c.AgentConfig.Driver } - dockerClient, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return fmt.Errorf("failed to get docker client: %v", err) } diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index 0c3227de1..fb71f5b1e 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/nomad/drivers/shared/eventer" "github.com/hashicorp/nomad/drivers/shared/hostnames" "github.com/hashicorp/nomad/drivers/shared/resolvconf" + "github.com/hashicorp/nomad/helper" nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" @@ -39,19 +40,6 @@ import ( ) var ( - // createClientsLock is a lock that protects reading/writing global client - // variables - createClientsLock sync.Mutex - - // client is a docker client with a timeout of 5 minutes. This is for doing - // all operations with the docker daemon besides which are not long running - // such as creating, killing containers, etc. - client *docker.Client - - // waitClient is a docker client with no timeouts. This is used for long - // running operations such as waiting on containers and collect stats - waitClient *docker.Client - dockerTransientErrs = []string{ "Client.Timeout exceeded while awaiting headers", "EOF", @@ -159,6 +147,10 @@ type Driver struct { detected bool detectedLock sync.RWMutex + dockerClientLock sync.Mutex + dockerClient *docker.Client // for most docker api calls (use getDockerClient()) + infinityClient *docker.Client // for wait and stop calls (use getInfinityClient()) + danglingReconciler *containerReconciler cpusetFixer CpusetFixer } @@ -229,12 +221,17 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { return fmt.Errorf("failed to decode driver task state: %v", err) } - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { - return fmt.Errorf("failed to get docker client: %v", err) + return fmt.Errorf("failed to get docker client: %w", err) } - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + infinityClient, err := d.getInfinityClient() + if err != nil { + return fmt.Errorf("failed to get docker long operations client: %w", err) + } + + container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: handleState.ContainerID, }) if err != nil { @@ -242,8 +239,8 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { } h := &taskHandle{ - client: client, - waitClient: waitClient, + dockerClient: dockerClient, + infinityClient: infinityClient, logger: d.logger.With("container_id", container.ID), task: handle.Config, containerID: container.ID, @@ -261,14 +258,14 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now()) if err != nil { - if err := client.StopContainer(handleState.ContainerID, 0); err != nil { + if err := dockerClient.StopContainer(handleState.ContainerID, 0); err != nil { d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) } return fmt.Errorf("failed to setup replacement docker logger: %v", err) } if err := handle.SetDriverState(h.buildState()); err != nil { - if err := client.StopContainer(handleState.ContainerID, 0); err != nil { + if err := dockerClient.StopContainer(handleState.ContainerID, 0); err != nil { d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) } return fmt.Errorf("failed to store driver state: %v", err) @@ -315,13 +312,19 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg - // Initialize docker API clients - client, _, err := d.dockerClients() + // we'll need the normal docker client + dockerClient, err := d.getDockerClient() if err != nil { - return nil, nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) + return nil, nil, fmt.Errorf("Failed to create docker client: %v", err) } - id, err := d.createImage(cfg, &driverConfig, client) + // and also the long operations client + infinityClient, err := d.getInfinityClient() + if err != nil { + return nil, nil, fmt.Errorf("Failed to create long operations docker client: %v", err) + } + + id, err := d.createImage(cfg, &driverConfig, dockerClient) if err != nil { return nil, nil, err } @@ -342,10 +345,10 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive startAttempts := 0 CREATE: - container, err := d.createContainer(client, containerCfg, driverConfig.Image) + container, err := d.createContainer(dockerClient, containerCfg, driverConfig.Image) if err != nil { d.logger.Error("failed to create container", "error", err) - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: containerCfg.Name, Force: true, }) @@ -361,7 +364,7 @@ CREATE: // Start the container if err := d.startContainer(container); err != nil { d.logger.Error("failed to start container", "container_id", container.ID, "error", err) - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: container.ID, Force: true, }) @@ -376,17 +379,17 @@ CREATE: // Inspect container to get all of the container metadata as much of the // metadata (eg networking) isn't populated until the container is started - runningContainer, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + runningContainer, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: container.ID, }) if err != nil { - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: container.ID, Force: true, }) msg := "failed to inspect started container" d.logger.Error(msg, "error", err) - client.RemoveContainer(docker.RemoveContainerOptions{ + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ ID: container.ID, Force: true, }) @@ -419,7 +422,7 @@ CREATE: dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0)) if err != nil { d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID) - client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) return nil, nil, err } } @@ -435,8 +438,8 @@ CREATE: // Return a driver handle h := &taskHandle{ - client: client, - waitClient: waitClient, + dockerClient: dockerClient, + infinityClient: infinityClient, dlogger: dlogger, dloggerPluginClient: pluginClient, logger: d.logger.With("container_id", container.ID), @@ -455,7 +458,7 @@ CREATE: dlogger.Stop() pluginClient.Kill() } - client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) + dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) return nil, nil, err } @@ -548,10 +551,15 @@ CREATE: // startContainer starts the passed container. It attempts to handle any // transient Docker errors. func (d *Driver) startContainer(c *docker.Container) error { + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + // Start a container attempted := 0 START: - startErr := client.StartContainer(c.ID, c.HostConfig) + startErr := dockerClient.StartContainer(c.ID, c.HostConfig) if startErr == nil || strings.Contains(startErr.Error(), "Container already running") { return nil } @@ -691,7 +699,12 @@ func (d *Driver) loadImage(task *drivers.TaskConfig, driverConfig *TaskConfig, c } func (d *Driver) convertAllocPathsForWindowsLCOW(task *drivers.TaskConfig, image string) error { - imageConfig, err := client.InspectImage(image) + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + + imageConfig, err := dockerClient.InspectImage(image) if err != nil { return fmt.Errorf("the image does not exist: %v", err) } @@ -763,7 +776,7 @@ func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConf func (d *Driver) findPauseContainer(allocID string) (string, error) { - _, dockerClient, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return "", err } @@ -799,7 +812,7 @@ func (d *Driver) findPauseContainer(allocID string) (string, error) { // tracking. Basically just scan all containers and pull the ID from anything // that has the Nomad Label and has Name with prefix "/nomad_init_". func (d *Driver) recoverPauseContainers(ctx context.Context) { - _, dockerClient, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { d.logger.Error("failed to recover pause containers", "error", err) return @@ -1459,11 +1472,11 @@ func (d *Driver) detectIP(c *docker.Container, driverConfig *TaskConfig) (string // if the container is dead or can't be found. func (d *Driver) containerByName(name string) (*docker.Container, error) { - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return nil, err } - containers, err := client.ListContainers(docker.ListContainersOptions{ + containers, err := dockerClient.ListContainers(docker.ListContainersOptions{ All: true, }) if err != nil { @@ -1495,7 +1508,7 @@ OUTER: return nil, nil } - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: shimContainer.ID, }) if err != nil { @@ -1563,7 +1576,12 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { return drivers.ErrTaskNotFound } - c, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + + c, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: h.containerID, }) if err != nil { @@ -1579,13 +1597,13 @@ func (d *Driver) DestroyTask(taskID string, force bool) error { if !force { return fmt.Errorf("must call StopTask for the given task before Destroy or set force to true") } - if err := h.client.StopContainer(h.containerID, 0); err != nil { + if err := dockerClient.StopContainer(h.containerID, 0); err != nil { h.logger.Warn("failed to stop container during destroy", "error", err) } } if h.removeContainerOnExit { - if err := h.client.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil { + if err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil { h.logger.Error("error removing container", "error", err) } } else { @@ -1621,7 +1639,12 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { return nil, drivers.ErrTaskNotFound } - container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ + dockerClient, err := d.getDockerClient() + if err != nil { + return nil, err + } + + container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: h.containerID, }) if err != nil { @@ -1723,7 +1746,13 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri Container: h.containerID, Context: ctx, } - exec, err := h.client.CreateExec(createExecOpts) + + dockerClient, err := d.getDockerClient() + if err != nil { + return nil, err + } + + exec, err := dockerClient.CreateExec(createExecOpts) if err != nil { return nil, fmt.Errorf("failed to create exec object: %v", err) } @@ -1739,7 +1768,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri if !ok { return } - client.ResizeExecTTY(exec.ID, s.Height, s.Width) + dockerClient.ResizeExecTTY(exec.ID, s.Height, s.Width) } } }() @@ -1758,7 +1787,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri ErrorStream: opts.Stderr, Context: ctx, } - if err := client.StartExec(exec.ID, startOpts); err != nil { + if err := dockerClient.StartExec(exec.ID, startOpts); err != nil { return nil, fmt.Errorf("failed to start exec: %v", err) } @@ -1769,7 +1798,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri start := time.Now() var res *docker.ExecInspect for (res == nil || res.Running) && time.Since(start) <= execTerminatingTimeout { - res, err = client.InspectExec(exec.ID) + res, err = dockerClient.InspectExec(exec.ID) if err != nil { return nil, fmt.Errorf("failed to inspect exec result: %v", err) } @@ -1785,37 +1814,37 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri }, nil } -// dockerClients creates two *docker.Client, one for long running operations and -// the other for shorter operations. In test / dev mode we can use ENV vars to -// connect to the docker daemon. In production mode we will read docker.endpoint -// from the config file. -func (d *Driver) dockerClients() (*docker.Client, *docker.Client, error) { - createClientsLock.Lock() - defer createClientsLock.Unlock() +func (d *Driver) getOrCreateClient(timeout time.Duration) (*docker.Client, error) { + var ( + client *docker.Client + err error + ) - if client != nil && waitClient != nil { - return client, waitClient, nil - } - - var err error - - // Only initialize the client if it hasn't yet been done - if client == nil { - client, err = d.newDockerClient(dockerTimeout) - if err != nil { - return nil, nil, err + helper.WithLock(&d.dockerClientLock, func() { + if timeout == 0 { + if d.infinityClient == nil { + d.infinityClient, err = d.newDockerClient(0) + } + client = d.infinityClient + } else { + if d.dockerClient == nil { + d.dockerClient, err = d.newDockerClient(timeout) + } + client = d.dockerClient } - } + }) - // Only initialize the waitClient if it hasn't yet been done - if waitClient == nil { - waitClient, err = d.newDockerClient(0 * time.Minute) - if err != nil { - return nil, nil, err - } - } + return client, err +} - return client, waitClient, nil +// getInfinityClient creates a docker API client with no timeout. +func (d *Driver) getInfinityClient() (*docker.Client, error) { + return d.getOrCreateClient(0) +} + +// getDockerClient creates a docker API client with a hard-coded timeout. +func (d *Driver) getDockerClient() (*docker.Client, error) { + return d.getOrCreateClient(dockerTimeout) } // newDockerClient creates a new *docker.Client with a configurable timeout diff --git a/drivers/docker/driver_unix_test.go b/drivers/docker/driver_unix_test.go index 2cd8387eb..1056076e0 100644 --- a/drivers/docker/driver_unix_test.go +++ b/drivers/docker/driver_unix_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" tu "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -125,25 +126,25 @@ func TestDockerDriver_NetworkMode_Host(t *testing.T) { copyImage(t, task.TaskDir(), "busybox.tar") _, _, err := d.StartTask(task) - require.NoError(t, err) + must.NoError(t, err) - require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) + must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) defer d.DestroyTask(task.ID, true) dockerDriver, ok := d.Impl().(*Driver) - require.True(t, ok) + must.True(t, ok) handle, ok := dockerDriver.tasks.Get(task.ID) - require.True(t, ok) + must.True(t, ok) + + client := newTestDockerClient(t) container, err := client.InspectContainer(handle.containerID) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) actual := container.HostConfig.NetworkMode - require.Equal(t, expected, actual) + must.Eq(t, expected, actual) } func TestDockerDriver_CPUCFSPeriod(t *testing.T) { diff --git a/drivers/docker/fingerprint.go b/drivers/docker/fingerprint.go index c67254707..0ee93d645 100644 --- a/drivers/docker/fingerprint.go +++ b/drivers/docker/fingerprint.go @@ -98,7 +98,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { return fp } - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { if d.fingerprintSuccessful() { d.logger.Info("failed to initialize client", "error", err) @@ -110,10 +110,10 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } - env, err := client.Version() + env, err := dockerClient.Version() if err != nil { if d.fingerprintSuccessful() { - d.logger.Debug("could not connect to docker daemon", "endpoint", client.Endpoint(), "error", err) + d.logger.Debug("could not connect to docker daemon", "endpoint", dockerClient.Endpoint(), "error", err) } d.setFingerprintFailure() @@ -143,7 +143,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { fp.Attributes["driver.docker.volumes.enabled"] = pstructs.NewBoolAttribute(true) } - if nets, err := client.ListNetworks(); err != nil { + if nets, err := dockerClient.ListNetworks(); err != nil { d.logger.Warn("error discovering bridge IP", "error", err) } else { for _, n := range nets { @@ -167,7 +167,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint { } } - if dockerInfo, err := client.Info(); err != nil { + if dockerInfo, err := dockerClient.Info(); err != nil { d.logger.Warn("failed to get Docker system info", "error", err) } else { runtimeNames := make([]string, 0, len(dockerInfo.Runtimes)) diff --git a/drivers/docker/handle.go b/drivers/docker/handle.go index 67f737ed4..9ad0e5598 100644 --- a/drivers/docker/handle.go +++ b/drivers/docker/handle.go @@ -25,8 +25,17 @@ import ( ) type taskHandle struct { - client *docker.Client - waitClient *docker.Client + // dockerClient is useful for normal docker API calls. It should be used + // for all calls that aren't Wait() or Stop() (and their variations). + dockerClient *docker.Client + + // infinityClient is useful for + // - the Wait docker API call(s) (no limit on container lifetime) + // - the Stop docker API call(s) (context with task kill_timeout required) + // Do not use this client for any other docker API calls, instead use the + // normal dockerClient which includes a default timeout. + infinityClient *docker.Client + logger hclog.Logger dlogger docklog.DockerLogger dloggerPluginClient *plugin.Client @@ -80,7 +89,7 @@ func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*driv Container: h.containerID, Context: ctx, } - exec, err := h.client.CreateExec(createExecOpts) + exec, err := h.dockerClient.CreateExec(createExecOpts) if err != nil { return nil, err } @@ -95,12 +104,12 @@ func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*driv ErrorStream: stderr, Context: ctx, } - if err := client.StartExec(exec.ID, startOpts); err != nil { + if err := h.dockerClient.StartExec(exec.ID, startOpts); err != nil { return nil, err } execResult.Stdout = stdout.Bytes() execResult.Stderr = stderr.Bytes() - res, err := client.InspectExec(exec.ID) + res, err := h.dockerClient.InspectExec(exec.ID) if err != nil { return execResult, err } @@ -120,13 +129,14 @@ func (h *taskHandle) Signal(ctx context.Context, s os.Signal) error { // MacOS signals to the correct signal number for docker. Or we change the // interface to take a signal string and leave it up to driver to map? - dockerSignal := docker.Signal(sysSig) opts := docker.KillContainerOptions{ ID: h.containerID, - Signal: dockerSignal, + Signal: docker.Signal(sysSig), Context: ctx, } - return h.client.KillContainer(opts) + + // remember Kill just means send a signal; this is not the complex StopContainer case + return h.dockerClient.KillContainer(opts) } // parseSignal interprets the signal name into an os.Signal. If no name is @@ -159,7 +169,13 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal string) error { // Signal is used to kill the container with the desired signal before // calling StopContainer if signal == "" { - err = h.client.StopContainer(h.containerID, uint(killTimeout.Seconds())) + // give the context timeout some wiggle room beyond the kill timeout + // docker will use, so we can happy path even in the force kill case + graciousTimeout := killTimeout + dockerTimeout + ctx, cancel := context.WithTimeout(context.Background(), graciousTimeout) + defer cancel() + apiTimeout := uint(killTimeout.Seconds()) + err = h.infinityClient.StopContainerWithContext(h.containerID, apiTimeout, ctx) } else { ctx, cancel := context.WithTimeout(context.Background(), killTimeout) defer cancel() @@ -191,8 +207,8 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal string) error { case <-ctx.Done(): } - // Stop the container - err = h.client.StopContainer(h.containerID, 0) + // Stop the container forcefully. + err = h.dockerClient.StopContainer(h.containerID, 0) } if err != nil { @@ -230,7 +246,7 @@ func (h *taskHandle) shutdownLogger() { func (h *taskHandle) run() { defer h.shutdownLogger() - exitCode, werr := h.waitClient.WaitContainer(h.containerID) + exitCode, werr := h.infinityClient.WaitContainer(h.containerID) if werr != nil { h.logger.Error("failed to wait for container; already terminated") } @@ -239,7 +255,7 @@ func (h *taskHandle) run() { werr = fmt.Errorf("Docker container exited with non-zero exit code: %d", exitCode) } - container, ierr := h.waitClient.InspectContainerWithOptions(docker.InspectContainerOptions{ + container, ierr := h.dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: h.containerID, }) oom := false @@ -264,8 +280,8 @@ func (h *taskHandle) run() { close(h.doneCh) // Stop the container just incase the docker daemon's wait returned - // incorrectly - if err := h.client.StopContainer(h.containerID, 0); err != nil { + // incorrectly. + if err := h.dockerClient.StopContainer(h.containerID, 0); err != nil { _, noSuchContainer := err.(*docker.NoSuchContainer) _, containerNotRunning := err.(*docker.ContainerNotRunning) if !containerNotRunning && !noSuchContainer { diff --git a/drivers/docker/network.go b/drivers/docker/network.go index e92acd3e5..5e66ff382 100644 --- a/drivers/docker/network.go +++ b/drivers/docker/network.go @@ -26,7 +26,7 @@ const ( func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreateRequest) (*drivers.NetworkIsolationSpec, bool, error) { // Initialize docker API clients - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return nil, false, fmt.Errorf("failed to connect to docker daemon: %s", err) } @@ -68,7 +68,7 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate return specFromContainer(container, createSpec.Hostname), false, nil } - container, err = d.createContainer(client, *config, d.config.InfraImage) + container, err = d.createContainer(dockerClient, *config, d.config.InfraImage) if err != nil { return nil, false, err } @@ -79,7 +79,7 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate // until the container is started, InspectContainerWithOptions // returns a mostly-empty struct - container, err = client.InspectContainerWithOptions(docker.InspectContainerOptions{ + container, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ ID: container.ID, }) if err != nil { @@ -122,17 +122,17 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp // let the background reconciliation keep trying d.pauseContainers.remove(id) - client, _, err := d.dockerClients() + dockerClient, err := d.getDockerClient() if err != nil { return fmt.Errorf("failed to connect to docker daemon: %s", err) } timeout := uint(1) // this is the pause container, just kill it fast - if err := client.StopContainerWithContext(id, timeout, d.ctx); err != nil { + if err := dockerClient.StopContainerWithContext(id, timeout, d.ctx); err != nil { d.logger.Warn("failed to stop pause container", "id", id, "error", err) } - if err := client.RemoveContainer(docker.RemoveContainerOptions{ + if err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{ Force: true, ID: id, }); err != nil { @@ -144,7 +144,7 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp // The Docker image ID is needed in order to correctly update the image // reference count. Any error finding this, however, should not result // in an error shutting down the allocrunner. - dockerImage, err := client.InspectImage(d.config.InfraImage) + dockerImage, err := dockerClient.InspectImage(d.config.InfraImage) if err != nil { d.logger.Warn("InspectImage failed for infra_image container destroy", "image", d.config.InfraImage, "error", err) @@ -187,6 +187,11 @@ func (d *Driver) createSandboxContainerConfig(allocID string, createSpec *driver func (d *Driver) pullInfraImage(allocID string) error { repo, tag := parseDockerImage(d.config.InfraImage) + dockerClient, err := d.getDockerClient() + if err != nil { + return err + } + // There's a (narrow) time-of-check-time-of-use race here. If we call // InspectImage and then a concurrent task shutdown happens before we call // IncrementImageReference, we could end up removing the image, and it @@ -194,7 +199,7 @@ func (d *Driver) pullInfraImage(allocID string) error { d.coordinator.imageLock.Lock() if tag != "latest" { - dockerImage, err := client.InspectImage(d.config.InfraImage) + dockerImage, err := dockerClient.InspectImage(d.config.InfraImage) if err != nil { d.logger.Debug("InspectImage failed for infra_image container pull", "image", d.config.InfraImage, "error", err) diff --git a/drivers/docker/reconcile_dangling.go b/drivers/docker/reconcile_dangling.go index d75a5268e..191684fbd 100644 --- a/drivers/docker/reconcile_dangling.go +++ b/drivers/docker/reconcile_dangling.go @@ -22,10 +22,10 @@ import ( // creation API call fail with a network error. containerReconciler // scans for these untracked containers and kill them. type containerReconciler struct { - ctx context.Context - config *ContainerGCConfig - client *docker.Client - logger hclog.Logger + ctx context.Context + config *ContainerGCConfig + logger hclog.Logger + getClient func() (*docker.Client, error) isDriverHealthy func() bool trackedContainers func() *set.Set[string] @@ -36,10 +36,10 @@ type containerReconciler struct { func newReconciler(d *Driver) *containerReconciler { return &containerReconciler{ - ctx: d.ctx, - config: &d.config.GC.DanglingContainers, - client: client, - logger: d.logger, + ctx: d.ctx, + config: &d.config.GC.DanglingContainers, + getClient: d.getDockerClient, + logger: d.logger, isDriverHealthy: func() bool { return d.previouslyDetected() && d.fingerprintSuccessful() }, trackedContainers: d.trackedContainers, @@ -109,9 +109,14 @@ func (r *containerReconciler) removeDanglingContainersIteration() error { return nil } + dockerClient, err := r.getClient() + if err != nil { + return err + } + for _, id := range untracked.Slice() { ctx, cancel := r.dockerAPIQueryContext() - err := client.RemoveContainer(docker.RemoveContainerOptions{ + err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{ Context: ctx, ID: id, Force: true, @@ -135,7 +140,12 @@ func (r *containerReconciler) untrackedContainers(tracked *set.Set[string], cuto ctx, cancel := r.dockerAPIQueryContext() defer cancel() - cc, err := client.ListContainers(docker.ListContainersOptions{ + dockerClient, err := r.getClient() + if err != nil { + return nil, err + } + + cc, err := dockerClient.ListContainers(docker.ListContainersOptions{ Context: ctx, All: false, // only reconcile running containers }) diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index 3ca614fa5..23097ee99 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -131,7 +131,7 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte } // Stats blocks until an error has occurred, or doneCh has been closed - if err := h.client.Stats(statsOpts); err != nil && err != io.ErrClosedPipe { + if err := h.dockerClient.Stats(statsOpts); err != nil && err != io.ErrClosedPipe { // An error occurred during stats collection, retry with backoff h.logger.Debug("error collecting stats from container", "error", err)