drivers/docker: refactor use of clients in docker driver (#17731)

* drivers/docker: refactor use of clients in docker driver

This PR refactors how we manage the two underlying clients used by the
docker driver for communicating with the docker daemon. We keep two clients
- one with a hard-coded timeout that applies to all operations no matter
what, intended for use with short lived / async calls to docker. The other
has no timeout and is the responsibility of the caller to set a context
that will ensure the call eventually terminates.

The use of these two clients has been confusing and mistakes were made
in a number of places where calls were making use of the wrong client.

This PR makes it so that a user must explicitly call a function to get
the client that makes sense for that use case.

Fixes #17023

* cr: followup items
This commit is contained in:
Seth Hoenig 2023-06-26 15:21:42 -05:00 committed by GitHub
parent 4c6906d873
commit d590123637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 188 additions and 124 deletions

3
.changelog/17731.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
drivers/docker: Fixed a bug where long-running docker operations would incorrectly timeout
```

View File

@ -765,7 +765,7 @@ func (d *Driver) SetConfig(c *base.Config) error {
d.clientConfig = c.AgentConfig.Driver d.clientConfig = c.AgentConfig.Driver
} }
dockerClient, _, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
return fmt.Errorf("failed to get docker client: %v", err) return fmt.Errorf("failed to get docker client: %v", err)
} }

View File

@ -30,6 +30,7 @@ import (
"github.com/hashicorp/nomad/drivers/shared/eventer" "github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/hostnames" "github.com/hashicorp/nomad/drivers/shared/hostnames"
"github.com/hashicorp/nomad/drivers/shared/resolvconf" "github.com/hashicorp/nomad/drivers/shared/resolvconf"
"github.com/hashicorp/nomad/helper"
nstructs "github.com/hashicorp/nomad/nomad/structs" nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
@ -39,19 +40,6 @@ import (
) )
var ( var (
// createClientsLock is a lock that protects reading/writing global client
// variables
createClientsLock sync.Mutex
// client is a docker client with a timeout of 5 minutes. This is for doing
// all operations with the docker daemon besides which are not long running
// such as creating, killing containers, etc.
client *docker.Client
// waitClient is a docker client with no timeouts. This is used for long
// running operations such as waiting on containers and collect stats
waitClient *docker.Client
dockerTransientErrs = []string{ dockerTransientErrs = []string{
"Client.Timeout exceeded while awaiting headers", "Client.Timeout exceeded while awaiting headers",
"EOF", "EOF",
@ -159,6 +147,10 @@ type Driver struct {
detected bool detected bool
detectedLock sync.RWMutex detectedLock sync.RWMutex
dockerClientLock sync.Mutex
dockerClient *docker.Client // for most docker api calls (use getDockerClient())
infinityClient *docker.Client // for wait and stop calls (use getInfinityClient())
danglingReconciler *containerReconciler danglingReconciler *containerReconciler
cpusetFixer CpusetFixer cpusetFixer CpusetFixer
} }
@ -229,12 +221,17 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to decode driver task state: %v", err) return fmt.Errorf("failed to decode driver task state: %v", err)
} }
client, _, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
return fmt.Errorf("failed to get docker client: %v", err) return fmt.Errorf("failed to get docker client: %w", err)
} }
container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ infinityClient, err := d.getInfinityClient()
if err != nil {
return fmt.Errorf("failed to get docker long operations client: %w", err)
}
container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: handleState.ContainerID, ID: handleState.ContainerID,
}) })
if err != nil { if err != nil {
@ -242,8 +239,8 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
} }
h := &taskHandle{ h := &taskHandle{
client: client, dockerClient: dockerClient,
waitClient: waitClient, infinityClient: infinityClient,
logger: d.logger.With("container_id", container.ID), logger: d.logger.With("container_id", container.ID),
task: handle.Config, task: handle.Config,
containerID: container.ID, containerID: container.ID,
@ -261,14 +258,14 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now()) h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
if err != nil { if err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil { if err := dockerClient.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
} }
return fmt.Errorf("failed to setup replacement docker logger: %v", err) return fmt.Errorf("failed to setup replacement docker logger: %v", err)
} }
if err := handle.SetDriverState(h.buildState()); err != nil { if err := handle.SetDriverState(h.buildState()); err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil { if err := dockerClient.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
} }
return fmt.Errorf("failed to store driver state: %v", err) return fmt.Errorf("failed to store driver state: %v", err)
@ -315,13 +312,19 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
handle := drivers.NewTaskHandle(taskHandleVersion) handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg handle.Config = cfg
// Initialize docker API clients // we'll need the normal docker client
client, _, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) return nil, nil, fmt.Errorf("Failed to create docker client: %v", err)
} }
id, err := d.createImage(cfg, &driverConfig, client) // and also the long operations client
infinityClient, err := d.getInfinityClient()
if err != nil {
return nil, nil, fmt.Errorf("Failed to create long operations docker client: %v", err)
}
id, err := d.createImage(cfg, &driverConfig, dockerClient)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -342,10 +345,10 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
startAttempts := 0 startAttempts := 0
CREATE: CREATE:
container, err := d.createContainer(client, containerCfg, driverConfig.Image) container, err := d.createContainer(dockerClient, containerCfg, driverConfig.Image)
if err != nil { if err != nil {
d.logger.Error("failed to create container", "error", err) d.logger.Error("failed to create container", "error", err)
client.RemoveContainer(docker.RemoveContainerOptions{ dockerClient.RemoveContainer(docker.RemoveContainerOptions{
ID: containerCfg.Name, ID: containerCfg.Name,
Force: true, Force: true,
}) })
@ -361,7 +364,7 @@ CREATE:
// Start the container // Start the container
if err := d.startContainer(container); err != nil { if err := d.startContainer(container); err != nil {
d.logger.Error("failed to start container", "container_id", container.ID, "error", err) d.logger.Error("failed to start container", "container_id", container.ID, "error", err)
client.RemoveContainer(docker.RemoveContainerOptions{ dockerClient.RemoveContainer(docker.RemoveContainerOptions{
ID: container.ID, ID: container.ID,
Force: true, Force: true,
}) })
@ -376,17 +379,17 @@ CREATE:
// Inspect container to get all of the container metadata as much of the // Inspect container to get all of the container metadata as much of the
// metadata (eg networking) isn't populated until the container is started // metadata (eg networking) isn't populated until the container is started
runningContainer, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ runningContainer, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: container.ID, ID: container.ID,
}) })
if err != nil { if err != nil {
client.RemoveContainer(docker.RemoveContainerOptions{ dockerClient.RemoveContainer(docker.RemoveContainerOptions{
ID: container.ID, ID: container.ID,
Force: true, Force: true,
}) })
msg := "failed to inspect started container" msg := "failed to inspect started container"
d.logger.Error(msg, "error", err) d.logger.Error(msg, "error", err)
client.RemoveContainer(docker.RemoveContainerOptions{ dockerClient.RemoveContainer(docker.RemoveContainerOptions{
ID: container.ID, ID: container.ID,
Force: true, Force: true,
}) })
@ -419,7 +422,7 @@ CREATE:
dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0)) dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
if err != nil { if err != nil {
d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID) d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err return nil, nil, err
} }
} }
@ -435,8 +438,8 @@ CREATE:
// Return a driver handle // Return a driver handle
h := &taskHandle{ h := &taskHandle{
client: client, dockerClient: dockerClient,
waitClient: waitClient, infinityClient: infinityClient,
dlogger: dlogger, dlogger: dlogger,
dloggerPluginClient: pluginClient, dloggerPluginClient: pluginClient,
logger: d.logger.With("container_id", container.ID), logger: d.logger.With("container_id", container.ID),
@ -455,7 +458,7 @@ CREATE:
dlogger.Stop() dlogger.Stop()
pluginClient.Kill() pluginClient.Kill()
} }
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err return nil, nil, err
} }
@ -548,10 +551,15 @@ CREATE:
// startContainer starts the passed container. It attempts to handle any // startContainer starts the passed container. It attempts to handle any
// transient Docker errors. // transient Docker errors.
func (d *Driver) startContainer(c *docker.Container) error { func (d *Driver) startContainer(c *docker.Container) error {
dockerClient, err := d.getDockerClient()
if err != nil {
return err
}
// Start a container // Start a container
attempted := 0 attempted := 0
START: START:
startErr := client.StartContainer(c.ID, c.HostConfig) startErr := dockerClient.StartContainer(c.ID, c.HostConfig)
if startErr == nil || strings.Contains(startErr.Error(), "Container already running") { if startErr == nil || strings.Contains(startErr.Error(), "Container already running") {
return nil return nil
} }
@ -691,7 +699,12 @@ func (d *Driver) loadImage(task *drivers.TaskConfig, driverConfig *TaskConfig, c
} }
func (d *Driver) convertAllocPathsForWindowsLCOW(task *drivers.TaskConfig, image string) error { func (d *Driver) convertAllocPathsForWindowsLCOW(task *drivers.TaskConfig, image string) error {
imageConfig, err := client.InspectImage(image) dockerClient, err := d.getDockerClient()
if err != nil {
return err
}
imageConfig, err := dockerClient.InspectImage(image)
if err != nil { if err != nil {
return fmt.Errorf("the image does not exist: %v", err) return fmt.Errorf("the image does not exist: %v", err)
} }
@ -763,7 +776,7 @@ func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConf
func (d *Driver) findPauseContainer(allocID string) (string, error) { func (d *Driver) findPauseContainer(allocID string) (string, error) {
_, dockerClient, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
return "", err return "", err
} }
@ -799,7 +812,7 @@ func (d *Driver) findPauseContainer(allocID string) (string, error) {
// tracking. Basically just scan all containers and pull the ID from anything // tracking. Basically just scan all containers and pull the ID from anything
// that has the Nomad Label and has Name with prefix "/nomad_init_". // that has the Nomad Label and has Name with prefix "/nomad_init_".
func (d *Driver) recoverPauseContainers(ctx context.Context) { func (d *Driver) recoverPauseContainers(ctx context.Context) {
_, dockerClient, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
d.logger.Error("failed to recover pause containers", "error", err) d.logger.Error("failed to recover pause containers", "error", err)
return return
@ -1459,11 +1472,11 @@ func (d *Driver) detectIP(c *docker.Container, driverConfig *TaskConfig) (string
// if the container is dead or can't be found. // if the container is dead or can't be found.
func (d *Driver) containerByName(name string) (*docker.Container, error) { func (d *Driver) containerByName(name string) (*docker.Container, error) {
client, _, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
containers, err := client.ListContainers(docker.ListContainersOptions{ containers, err := dockerClient.ListContainers(docker.ListContainersOptions{
All: true, All: true,
}) })
if err != nil { if err != nil {
@ -1495,7 +1508,7 @@ OUTER:
return nil, nil return nil, nil
} }
container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: shimContainer.ID, ID: shimContainer.ID,
}) })
if err != nil { if err != nil {
@ -1563,7 +1576,12 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
return drivers.ErrTaskNotFound return drivers.ErrTaskNotFound
} }
c, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ dockerClient, err := d.getDockerClient()
if err != nil {
return err
}
c, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: h.containerID, ID: h.containerID,
}) })
if err != nil { if err != nil {
@ -1579,13 +1597,13 @@ func (d *Driver) DestroyTask(taskID string, force bool) error {
if !force { if !force {
return fmt.Errorf("must call StopTask for the given task before Destroy or set force to true") return fmt.Errorf("must call StopTask for the given task before Destroy or set force to true")
} }
if err := h.client.StopContainer(h.containerID, 0); err != nil { if err := dockerClient.StopContainer(h.containerID, 0); err != nil {
h.logger.Warn("failed to stop container during destroy", "error", err) h.logger.Warn("failed to stop container during destroy", "error", err)
} }
} }
if h.removeContainerOnExit { if h.removeContainerOnExit {
if err := h.client.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil { if err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil {
h.logger.Error("error removing container", "error", err) h.logger.Error("error removing container", "error", err)
} }
} else { } else {
@ -1621,7 +1639,12 @@ func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
return nil, drivers.ErrTaskNotFound return nil, drivers.ErrTaskNotFound
} }
container, err := client.InspectContainerWithOptions(docker.InspectContainerOptions{ dockerClient, err := d.getDockerClient()
if err != nil {
return nil, err
}
container, err := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: h.containerID, ID: h.containerID,
}) })
if err != nil { if err != nil {
@ -1723,7 +1746,13 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri
Container: h.containerID, Container: h.containerID,
Context: ctx, Context: ctx,
} }
exec, err := h.client.CreateExec(createExecOpts)
dockerClient, err := d.getDockerClient()
if err != nil {
return nil, err
}
exec, err := dockerClient.CreateExec(createExecOpts)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create exec object: %v", err) return nil, fmt.Errorf("failed to create exec object: %v", err)
} }
@ -1739,7 +1768,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri
if !ok { if !ok {
return return
} }
client.ResizeExecTTY(exec.ID, s.Height, s.Width) dockerClient.ResizeExecTTY(exec.ID, s.Height, s.Width)
} }
} }
}() }()
@ -1758,7 +1787,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri
ErrorStream: opts.Stderr, ErrorStream: opts.Stderr,
Context: ctx, Context: ctx,
} }
if err := client.StartExec(exec.ID, startOpts); err != nil { if err := dockerClient.StartExec(exec.ID, startOpts); err != nil {
return nil, fmt.Errorf("failed to start exec: %v", err) return nil, fmt.Errorf("failed to start exec: %v", err)
} }
@ -1769,7 +1798,7 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri
start := time.Now() start := time.Now()
var res *docker.ExecInspect var res *docker.ExecInspect
for (res == nil || res.Running) && time.Since(start) <= execTerminatingTimeout { for (res == nil || res.Running) && time.Since(start) <= execTerminatingTimeout {
res, err = client.InspectExec(exec.ID) res, err = dockerClient.InspectExec(exec.ID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to inspect exec result: %v", err) return nil, fmt.Errorf("failed to inspect exec result: %v", err)
} }
@ -1785,37 +1814,37 @@ func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *dri
}, nil }, nil
} }
// dockerClients creates two *docker.Client, one for long running operations and func (d *Driver) getOrCreateClient(timeout time.Duration) (*docker.Client, error) {
// the other for shorter operations. In test / dev mode we can use ENV vars to var (
// connect to the docker daemon. In production mode we will read docker.endpoint client *docker.Client
// from the config file. err error
func (d *Driver) dockerClients() (*docker.Client, *docker.Client, error) { )
createClientsLock.Lock()
defer createClientsLock.Unlock()
if client != nil && waitClient != nil { helper.WithLock(&d.dockerClientLock, func() {
return client, waitClient, nil if timeout == 0 {
} if d.infinityClient == nil {
d.infinityClient, err = d.newDockerClient(0)
var err error }
client = d.infinityClient
// Only initialize the client if it hasn't yet been done } else {
if client == nil { if d.dockerClient == nil {
client, err = d.newDockerClient(dockerTimeout) d.dockerClient, err = d.newDockerClient(timeout)
if err != nil { }
return nil, nil, err client = d.dockerClient
} }
} })
// Only initialize the waitClient if it hasn't yet been done return client, err
if waitClient == nil { }
waitClient, err = d.newDockerClient(0 * time.Minute)
if err != nil {
return nil, nil, err
}
}
return client, waitClient, nil // getInfinityClient creates a docker API client with no timeout.
func (d *Driver) getInfinityClient() (*docker.Client, error) {
return d.getOrCreateClient(0)
}
// getDockerClient creates a docker API client with a hard-coded timeout.
func (d *Driver) getDockerClient() (*docker.Client, error) {
return d.getOrCreateClient(dockerTimeout)
} }
// newDockerClient creates a new *docker.Client with a configurable timeout // newDockerClient creates a new *docker.Client with a configurable timeout

View File

@ -25,6 +25,7 @@ import (
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
tu "github.com/hashicorp/nomad/testutil" tu "github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -125,25 +126,25 @@ func TestDockerDriver_NetworkMode_Host(t *testing.T) {
copyImage(t, task.TaskDir(), "busybox.tar") copyImage(t, task.TaskDir(), "busybox.tar")
_, _, err := d.StartTask(task) _, _, err := d.StartTask(task)
require.NoError(t, err) must.NoError(t, err)
require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))
defer d.DestroyTask(task.ID, true) defer d.DestroyTask(task.ID, true)
dockerDriver, ok := d.Impl().(*Driver) dockerDriver, ok := d.Impl().(*Driver)
require.True(t, ok) must.True(t, ok)
handle, ok := dockerDriver.tasks.Get(task.ID) handle, ok := dockerDriver.tasks.Get(task.ID)
require.True(t, ok) must.True(t, ok)
client := newTestDockerClient(t)
container, err := client.InspectContainer(handle.containerID) container, err := client.InspectContainer(handle.containerID)
if err != nil { must.NoError(t, err)
t.Fatalf("err: %v", err)
}
actual := container.HostConfig.NetworkMode actual := container.HostConfig.NetworkMode
require.Equal(t, expected, actual) must.Eq(t, expected, actual)
} }
func TestDockerDriver_CPUCFSPeriod(t *testing.T) { func TestDockerDriver_CPUCFSPeriod(t *testing.T) {

View File

@ -98,7 +98,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
return fp return fp
} }
client, _, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
if d.fingerprintSuccessful() { if d.fingerprintSuccessful() {
d.logger.Info("failed to initialize client", "error", err) d.logger.Info("failed to initialize client", "error", err)
@ -110,10 +110,10 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
} }
} }
env, err := client.Version() env, err := dockerClient.Version()
if err != nil { if err != nil {
if d.fingerprintSuccessful() { if d.fingerprintSuccessful() {
d.logger.Debug("could not connect to docker daemon", "endpoint", client.Endpoint(), "error", err) d.logger.Debug("could not connect to docker daemon", "endpoint", dockerClient.Endpoint(), "error", err)
} }
d.setFingerprintFailure() d.setFingerprintFailure()
@ -143,7 +143,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
fp.Attributes["driver.docker.volumes.enabled"] = pstructs.NewBoolAttribute(true) fp.Attributes["driver.docker.volumes.enabled"] = pstructs.NewBoolAttribute(true)
} }
if nets, err := client.ListNetworks(); err != nil { if nets, err := dockerClient.ListNetworks(); err != nil {
d.logger.Warn("error discovering bridge IP", "error", err) d.logger.Warn("error discovering bridge IP", "error", err)
} else { } else {
for _, n := range nets { for _, n := range nets {
@ -167,7 +167,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
} }
} }
if dockerInfo, err := client.Info(); err != nil { if dockerInfo, err := dockerClient.Info(); err != nil {
d.logger.Warn("failed to get Docker system info", "error", err) d.logger.Warn("failed to get Docker system info", "error", err)
} else { } else {
runtimeNames := make([]string, 0, len(dockerInfo.Runtimes)) runtimeNames := make([]string, 0, len(dockerInfo.Runtimes))

View File

@ -25,8 +25,17 @@ import (
) )
type taskHandle struct { type taskHandle struct {
client *docker.Client // dockerClient is useful for normal docker API calls. It should be used
waitClient *docker.Client // for all calls that aren't Wait() or Stop() (and their variations).
dockerClient *docker.Client
// infinityClient is useful for
// - the Wait docker API call(s) (no limit on container lifetime)
// - the Stop docker API call(s) (context with task kill_timeout required)
// Do not use this client for any other docker API calls, instead use the
// normal dockerClient which includes a default timeout.
infinityClient *docker.Client
logger hclog.Logger logger hclog.Logger
dlogger docklog.DockerLogger dlogger docklog.DockerLogger
dloggerPluginClient *plugin.Client dloggerPluginClient *plugin.Client
@ -80,7 +89,7 @@ func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*driv
Container: h.containerID, Container: h.containerID,
Context: ctx, Context: ctx,
} }
exec, err := h.client.CreateExec(createExecOpts) exec, err := h.dockerClient.CreateExec(createExecOpts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -95,12 +104,12 @@ func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*driv
ErrorStream: stderr, ErrorStream: stderr,
Context: ctx, Context: ctx,
} }
if err := client.StartExec(exec.ID, startOpts); err != nil { if err := h.dockerClient.StartExec(exec.ID, startOpts); err != nil {
return nil, err return nil, err
} }
execResult.Stdout = stdout.Bytes() execResult.Stdout = stdout.Bytes()
execResult.Stderr = stderr.Bytes() execResult.Stderr = stderr.Bytes()
res, err := client.InspectExec(exec.ID) res, err := h.dockerClient.InspectExec(exec.ID)
if err != nil { if err != nil {
return execResult, err return execResult, err
} }
@ -120,13 +129,14 @@ func (h *taskHandle) Signal(ctx context.Context, s os.Signal) error {
// MacOS signals to the correct signal number for docker. Or we change the // MacOS signals to the correct signal number for docker. Or we change the
// interface to take a signal string and leave it up to driver to map? // interface to take a signal string and leave it up to driver to map?
dockerSignal := docker.Signal(sysSig)
opts := docker.KillContainerOptions{ opts := docker.KillContainerOptions{
ID: h.containerID, ID: h.containerID,
Signal: dockerSignal, Signal: docker.Signal(sysSig),
Context: ctx, Context: ctx,
} }
return h.client.KillContainer(opts)
// remember Kill just means send a signal; this is not the complex StopContainer case
return h.dockerClient.KillContainer(opts)
} }
// parseSignal interprets the signal name into an os.Signal. If no name is // parseSignal interprets the signal name into an os.Signal. If no name is
@ -159,7 +169,13 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal string) error {
// Signal is used to kill the container with the desired signal before // Signal is used to kill the container with the desired signal before
// calling StopContainer // calling StopContainer
if signal == "" { if signal == "" {
err = h.client.StopContainer(h.containerID, uint(killTimeout.Seconds())) // give the context timeout some wiggle room beyond the kill timeout
// docker will use, so we can happy path even in the force kill case
graciousTimeout := killTimeout + dockerTimeout
ctx, cancel := context.WithTimeout(context.Background(), graciousTimeout)
defer cancel()
apiTimeout := uint(killTimeout.Seconds())
err = h.infinityClient.StopContainerWithContext(h.containerID, apiTimeout, ctx)
} else { } else {
ctx, cancel := context.WithTimeout(context.Background(), killTimeout) ctx, cancel := context.WithTimeout(context.Background(), killTimeout)
defer cancel() defer cancel()
@ -191,8 +207,8 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal string) error {
case <-ctx.Done(): case <-ctx.Done():
} }
// Stop the container // Stop the container forcefully.
err = h.client.StopContainer(h.containerID, 0) err = h.dockerClient.StopContainer(h.containerID, 0)
} }
if err != nil { if err != nil {
@ -230,7 +246,7 @@ func (h *taskHandle) shutdownLogger() {
func (h *taskHandle) run() { func (h *taskHandle) run() {
defer h.shutdownLogger() defer h.shutdownLogger()
exitCode, werr := h.waitClient.WaitContainer(h.containerID) exitCode, werr := h.infinityClient.WaitContainer(h.containerID)
if werr != nil { if werr != nil {
h.logger.Error("failed to wait for container; already terminated") h.logger.Error("failed to wait for container; already terminated")
} }
@ -239,7 +255,7 @@ func (h *taskHandle) run() {
werr = fmt.Errorf("Docker container exited with non-zero exit code: %d", exitCode) werr = fmt.Errorf("Docker container exited with non-zero exit code: %d", exitCode)
} }
container, ierr := h.waitClient.InspectContainerWithOptions(docker.InspectContainerOptions{ container, ierr := h.dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: h.containerID, ID: h.containerID,
}) })
oom := false oom := false
@ -264,8 +280,8 @@ func (h *taskHandle) run() {
close(h.doneCh) close(h.doneCh)
// Stop the container just incase the docker daemon's wait returned // Stop the container just incase the docker daemon's wait returned
// incorrectly // incorrectly.
if err := h.client.StopContainer(h.containerID, 0); err != nil { if err := h.dockerClient.StopContainer(h.containerID, 0); err != nil {
_, noSuchContainer := err.(*docker.NoSuchContainer) _, noSuchContainer := err.(*docker.NoSuchContainer)
_, containerNotRunning := err.(*docker.ContainerNotRunning) _, containerNotRunning := err.(*docker.ContainerNotRunning)
if !containerNotRunning && !noSuchContainer { if !containerNotRunning && !noSuchContainer {

View File

@ -26,7 +26,7 @@ const (
func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreateRequest) (*drivers.NetworkIsolationSpec, bool, error) { func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreateRequest) (*drivers.NetworkIsolationSpec, bool, error) {
// Initialize docker API clients // Initialize docker API clients
client, _, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to connect to docker daemon: %s", err) return nil, false, fmt.Errorf("failed to connect to docker daemon: %s", err)
} }
@ -68,7 +68,7 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate
return specFromContainer(container, createSpec.Hostname), false, nil return specFromContainer(container, createSpec.Hostname), false, nil
} }
container, err = d.createContainer(client, *config, d.config.InfraImage) container, err = d.createContainer(dockerClient, *config, d.config.InfraImage)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -79,7 +79,7 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate
// until the container is started, InspectContainerWithOptions // until the container is started, InspectContainerWithOptions
// returns a mostly-empty struct // returns a mostly-empty struct
container, err = client.InspectContainerWithOptions(docker.InspectContainerOptions{ container, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{
ID: container.ID, ID: container.ID,
}) })
if err != nil { if err != nil {
@ -122,17 +122,17 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp
// let the background reconciliation keep trying // let the background reconciliation keep trying
d.pauseContainers.remove(id) d.pauseContainers.remove(id)
client, _, err := d.dockerClients() dockerClient, err := d.getDockerClient()
if err != nil { if err != nil {
return fmt.Errorf("failed to connect to docker daemon: %s", err) return fmt.Errorf("failed to connect to docker daemon: %s", err)
} }
timeout := uint(1) // this is the pause container, just kill it fast timeout := uint(1) // this is the pause container, just kill it fast
if err := client.StopContainerWithContext(id, timeout, d.ctx); err != nil { if err := dockerClient.StopContainerWithContext(id, timeout, d.ctx); err != nil {
d.logger.Warn("failed to stop pause container", "id", id, "error", err) d.logger.Warn("failed to stop pause container", "id", id, "error", err)
} }
if err := client.RemoveContainer(docker.RemoveContainerOptions{ if err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{
Force: true, Force: true,
ID: id, ID: id,
}); err != nil { }); err != nil {
@ -144,7 +144,7 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp
// The Docker image ID is needed in order to correctly update the image // The Docker image ID is needed in order to correctly update the image
// reference count. Any error finding this, however, should not result // reference count. Any error finding this, however, should not result
// in an error shutting down the allocrunner. // in an error shutting down the allocrunner.
dockerImage, err := client.InspectImage(d.config.InfraImage) dockerImage, err := dockerClient.InspectImage(d.config.InfraImage)
if err != nil { if err != nil {
d.logger.Warn("InspectImage failed for infra_image container destroy", d.logger.Warn("InspectImage failed for infra_image container destroy",
"image", d.config.InfraImage, "error", err) "image", d.config.InfraImage, "error", err)
@ -187,6 +187,11 @@ func (d *Driver) createSandboxContainerConfig(allocID string, createSpec *driver
func (d *Driver) pullInfraImage(allocID string) error { func (d *Driver) pullInfraImage(allocID string) error {
repo, tag := parseDockerImage(d.config.InfraImage) repo, tag := parseDockerImage(d.config.InfraImage)
dockerClient, err := d.getDockerClient()
if err != nil {
return err
}
// There's a (narrow) time-of-check-time-of-use race here. If we call // There's a (narrow) time-of-check-time-of-use race here. If we call
// InspectImage and then a concurrent task shutdown happens before we call // InspectImage and then a concurrent task shutdown happens before we call
// IncrementImageReference, we could end up removing the image, and it // IncrementImageReference, we could end up removing the image, and it
@ -194,7 +199,7 @@ func (d *Driver) pullInfraImage(allocID string) error {
d.coordinator.imageLock.Lock() d.coordinator.imageLock.Lock()
if tag != "latest" { if tag != "latest" {
dockerImage, err := client.InspectImage(d.config.InfraImage) dockerImage, err := dockerClient.InspectImage(d.config.InfraImage)
if err != nil { if err != nil {
d.logger.Debug("InspectImage failed for infra_image container pull", d.logger.Debug("InspectImage failed for infra_image container pull",
"image", d.config.InfraImage, "error", err) "image", d.config.InfraImage, "error", err)

View File

@ -22,10 +22,10 @@ import (
// creation API call fail with a network error. containerReconciler // creation API call fail with a network error. containerReconciler
// scans for these untracked containers and kill them. // scans for these untracked containers and kill them.
type containerReconciler struct { type containerReconciler struct {
ctx context.Context ctx context.Context
config *ContainerGCConfig config *ContainerGCConfig
client *docker.Client logger hclog.Logger
logger hclog.Logger getClient func() (*docker.Client, error)
isDriverHealthy func() bool isDriverHealthy func() bool
trackedContainers func() *set.Set[string] trackedContainers func() *set.Set[string]
@ -36,10 +36,10 @@ type containerReconciler struct {
func newReconciler(d *Driver) *containerReconciler { func newReconciler(d *Driver) *containerReconciler {
return &containerReconciler{ return &containerReconciler{
ctx: d.ctx, ctx: d.ctx,
config: &d.config.GC.DanglingContainers, config: &d.config.GC.DanglingContainers,
client: client, getClient: d.getDockerClient,
logger: d.logger, logger: d.logger,
isDriverHealthy: func() bool { return d.previouslyDetected() && d.fingerprintSuccessful() }, isDriverHealthy: func() bool { return d.previouslyDetected() && d.fingerprintSuccessful() },
trackedContainers: d.trackedContainers, trackedContainers: d.trackedContainers,
@ -109,9 +109,14 @@ func (r *containerReconciler) removeDanglingContainersIteration() error {
return nil return nil
} }
dockerClient, err := r.getClient()
if err != nil {
return err
}
for _, id := range untracked.Slice() { for _, id := range untracked.Slice() {
ctx, cancel := r.dockerAPIQueryContext() ctx, cancel := r.dockerAPIQueryContext()
err := client.RemoveContainer(docker.RemoveContainerOptions{ err := dockerClient.RemoveContainer(docker.RemoveContainerOptions{
Context: ctx, Context: ctx,
ID: id, ID: id,
Force: true, Force: true,
@ -135,7 +140,12 @@ func (r *containerReconciler) untrackedContainers(tracked *set.Set[string], cuto
ctx, cancel := r.dockerAPIQueryContext() ctx, cancel := r.dockerAPIQueryContext()
defer cancel() defer cancel()
cc, err := client.ListContainers(docker.ListContainersOptions{ dockerClient, err := r.getClient()
if err != nil {
return nil, err
}
cc, err := dockerClient.ListContainers(docker.ListContainersOptions{
Context: ctx, Context: ctx,
All: false, // only reconcile running containers All: false, // only reconcile running containers
}) })

View File

@ -131,7 +131,7 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
} }
// Stats blocks until an error has occurred, or doneCh has been closed // Stats blocks until an error has occurred, or doneCh has been closed
if err := h.client.Stats(statsOpts); err != nil && err != io.ErrClosedPipe { if err := h.dockerClient.Stats(statsOpts); err != nil && err != io.ErrClosedPipe {
// An error occurred during stats collection, retry with backoff // An error occurred during stats collection, retry with backoff
h.logger.Debug("error collecting stats from container", "error", err) h.logger.Debug("error collecting stats from container", "error", err)