docker: fix bug where network pause containers would be erroneously reconciled (#16352)

* docker: fix bug where network pause containers would be erroneously gc'd

* docker: cl: thread context from driver into pause container restoration
This commit is contained in:
Seth Hoenig 2023-03-07 12:17:32 -06:00 committed by GitHub
parent 05fff34fc8
commit 835365d2a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 250 additions and 111 deletions

3
.changelog/16352.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
docker: Fixed a bug where pause containers would be erroneously removed
```

View File

@ -20,6 +20,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/drivers/docker/docklog"
@ -84,6 +85,35 @@ const (
dockerLabelNodeID = "com.hashicorp.nomad.node_id"
)
type pauseContainerStore struct {
lock sync.Mutex
containerIDs *set.Set[string]
}
func newPauseContainerStore() *pauseContainerStore {
return &pauseContainerStore{
containerIDs: set.New[string](10),
}
}
func (s *pauseContainerStore) add(id string) {
s.lock.Lock()
defer s.lock.Unlock()
s.containerIDs.Insert(id)
}
func (s *pauseContainerStore) remove(id string) {
s.lock.Lock()
defer s.lock.Unlock()
s.containerIDs.Remove(id)
}
func (s *pauseContainerStore) union(other *set.Set[string]) *set.Set[string] {
s.lock.Lock()
defer s.lock.Unlock()
return other.Union(s.containerIDs)
}
type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
@ -104,6 +134,9 @@ type Driver struct {
// tasks is the in memory datastore mapping taskIDs to taskHandles
tasks *taskStore
// pauseContainers keeps track of pause container IDs in use by allocations
pauseContainers *pauseContainerStore
// coordinator is what tracks multiple image pulls against the same docker image
coordinator *dockerCoordinator
@ -130,13 +163,16 @@ type Driver struct {
// NewDockerDriver returns a docker implementation of a driver plugin
func NewDockerDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
driver := &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
pauseContainers: newPauseContainerStore(),
ctx: ctx,
logger: logger,
}
go driver.recoverPauseContainers(ctx)
return driver
}
func (d *Driver) reattachToDockerLogger(reattachConfig *pstructs.ReattachConfig) (docklog.DockerLogger, *plugin.Client, error) {
@ -238,6 +274,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
}
d.tasks.Set(handle.Config.ID, h)
// find a pause container?
go h.run()
return nil
@ -709,6 +748,40 @@ func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConf
return binds, nil
}
func (d *Driver) recoverPauseContainers(ctx context.Context) {
// On Client restart, we must rebuild the set of pause containers
// we are tracking. Basically just scan all containers and pull the ID from
// anything that has the Nomad Label and has Name with prefix "/nomad_init_".
_, dockerClient, err := d.dockerClients()
if err != nil {
d.logger.Error("failed to recover pause containers", "error", err)
return
}
containers, listErr := dockerClient.ListContainers(docker.ListContainersOptions{
Context: ctx,
All: false, // running only
Filters: map[string][]string{
"label": {dockerLabelAllocID},
},
})
if listErr != nil {
d.logger.Error("failed to list pause containers", "error", err)
return
}
CONTAINER:
for _, c := range containers {
for _, name := range c.Names {
if strings.HasPrefix(name, "/nomad_init_") {
d.pauseContainers.add(c.ID)
continue CONTAINER
}
}
}
}
var userMountToUnixMount = map[string]string{
// Empty string maps to `rprivate` for backwards compatibility in restored
// older tasks, where mount propagation will not be present.

View File

@ -83,10 +83,19 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate
return nil, false, err
}
// keep track of this pause container for reconciliation
d.pauseContainers.add(container.ID)
return specFromContainer(container, createSpec.Hostname), true, nil
}
func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSpec) error {
id := spec.Labels[dockerNetSpecLabelKey]
// no longer tracking this pause container; even if we fail here we should
// let the background reconciliation keep trying
d.pauseContainers.remove(id)
client, _, err := d.dockerClients()
if err != nil {
return fmt.Errorf("failed to connect to docker daemon: %s", err)
@ -94,7 +103,7 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp
if err := client.RemoveContainer(docker.RemoveContainerOptions{
Force: true,
ID: spec.Labels[dockerNetSpecLabelKey],
ID: id,
}); err != nil {
return err
}

View File

@ -9,6 +9,7 @@ import (
docker "github.com/fsouza/go-dockerclient"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-set"
)
// containerReconciler detects and kills unexpectedly running containers.
@ -24,7 +25,7 @@ type containerReconciler struct {
logger hclog.Logger
isDriverHealthy func() bool
trackedContainers func() map[string]bool
trackedContainers func() *set.Set[string]
isNomadContainer func(c docker.APIContainers) bool
once sync.Once
@ -96,7 +97,7 @@ func (r *containerReconciler) removeDanglingContainersIteration() error {
return fmt.Errorf("failed to find untracked containers: %v", err)
}
if len(untracked) == 0 {
if untracked.Empty() {
return nil
}
@ -105,7 +106,7 @@ func (r *containerReconciler) removeDanglingContainersIteration() error {
return nil
}
for _, id := range untracked {
for _, id := range untracked.Slice() {
ctx, cancel := r.dockerAPIQueryContext()
err := client.RemoveContainer(docker.RemoveContainerOptions{
Context: ctx,
@ -125,8 +126,8 @@ func (r *containerReconciler) removeDanglingContainersIteration() error {
// untrackedContainers returns the ids of containers that suspected
// to have been started by Nomad but aren't tracked by this driver
func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutoffTime time.Time) ([]string, error) {
result := []string{}
func (r *containerReconciler) untrackedContainers(tracked *set.Set[string], cutoffTime time.Time) (*set.Set[string], error) {
result := set.New[string](10)
ctx, cancel := r.dockerAPIQueryContext()
defer cancel()
@ -142,7 +143,7 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof
cutoff := cutoffTime.Unix()
for _, c := range cc {
if tracked[c.ID] {
if tracked.Contains(c.ID) {
continue
}
@ -154,9 +155,8 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof
continue
}
result = append(result, c.ID)
result.Insert(c.ID)
}
return result, nil
}
@ -165,7 +165,7 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof
//
// We'll try hitting Docker API on subsequent iteration.
func (r *containerReconciler) dockerAPIQueryContext() (context.Context, context.CancelFunc) {
// use a reasoanble floor to avoid very small limit
// use a reasonable floor to avoid very small limit
timeout := 30 * time.Second
if timeout < r.config.period {
@ -211,18 +211,15 @@ func hasNomadName(c docker.APIContainers) bool {
return true
}
}
return false
}
func (d *Driver) trackedContainers() map[string]bool {
d.tasks.lock.RLock()
defer d.tasks.lock.RUnlock()
r := make(map[string]bool, len(d.tasks.store))
for _, h := range d.tasks.store {
r[h.containerID] = true
}
return r
// trackedContainers returns the set of container IDs of containers that were
// started by Driver and are expected to be running. This includes both normal
// Task containers, as well as infra pause containers.
func (d *Driver) trackedContainers() *set.Set[string] {
// collect the task containers
ids := d.tasks.IDs()
// now also accumulate pause containers
return d.pauseContainers.union(ids)
}

View File

@ -2,30 +2,30 @@ package docker
import (
"encoding/json"
"fmt"
"os"
"regexp"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/shoenig/test/must"
)
func fakeContainerList(t *testing.T) (nomadContainer, nonNomadContainer docker.APIContainers) {
path := "./test-resources/docker/reconciler_containers_list.json"
f, err := os.Open(path)
if err != nil {
t.Fatalf("failed to open file: %v", err)
}
must.NoError(t, err, must.Sprintf("failed to open %s", path))
var sampleContainerList []docker.APIContainers
err = json.NewDecoder(f).Decode(&sampleContainerList)
if err != nil {
t.Fatalf("failed to decode container list: %v", err)
}
must.NoError(t, err, must.Sprint("failed to decode container list"))
return sampleContainerList[0], sampleContainerList[1]
}
@ -35,15 +35,15 @@ func Test_HasMount(t *testing.T) {
nomadContainer, nonNomadContainer := fakeContainerList(t)
require.True(t, hasMount(nomadContainer, "/alloc"))
require.True(t, hasMount(nomadContainer, "/data"))
require.True(t, hasMount(nomadContainer, "/secrets"))
require.False(t, hasMount(nomadContainer, "/random"))
must.True(t, hasMount(nomadContainer, "/alloc"))
must.True(t, hasMount(nomadContainer, "/data"))
must.True(t, hasMount(nomadContainer, "/secrets"))
must.False(t, hasMount(nomadContainer, "/random"))
require.False(t, hasMount(nonNomadContainer, "/alloc"))
require.False(t, hasMount(nonNomadContainer, "/data"))
require.False(t, hasMount(nonNomadContainer, "/secrets"))
require.False(t, hasMount(nonNomadContainer, "/random"))
must.False(t, hasMount(nonNomadContainer, "/alloc"))
must.False(t, hasMount(nonNomadContainer, "/data"))
must.False(t, hasMount(nonNomadContainer, "/secrets"))
must.False(t, hasMount(nonNomadContainer, "/random"))
}
func Test_HasNomadName(t *testing.T) {
@ -51,41 +51,45 @@ func Test_HasNomadName(t *testing.T) {
nomadContainer, nonNomadContainer := fakeContainerList(t)
require.True(t, hasNomadName(nomadContainer))
require.False(t, hasNomadName(nonNomadContainer))
must.True(t, hasNomadName(nomadContainer))
must.False(t, hasNomadName(nonNomadContainer))
}
// TestDanglingContainerRemoval asserts containers without corresponding tasks
// TestDanglingContainerRemoval_normal asserts containers without corresponding tasks
// are removed after the creation grace period.
func TestDanglingContainerRemoval(t *testing.T) {
func TestDanglingContainerRemoval_normal(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)
// start two containers: one tracked nomad container, and one unrelated container
task, cfg, _ := dockerTask(t)
require.NoError(t, task.EncodeConcreteDriverConfig(cfg))
must.NoError(t, task.EncodeConcreteDriverConfig(cfg))
client, d, handle, cleanup := dockerSetup(t, task, nil)
defer cleanup()
require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))
dockerClient, d, handle, cleanup := dockerSetup(t, task, nil)
t.Cleanup(cleanup)
nonNomadContainer, err := client.CreateContainer(docker.CreateContainerOptions{
// wait for task to start
must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second))
nonNomadContainer, err := dockerClient.CreateContainer(docker.CreateContainerOptions{
Name: "mytest-image-" + uuid.Generate(),
Config: &docker.Config{
Image: cfg.Image,
Cmd: append([]string{cfg.Command}, cfg.Args...),
},
})
require.NoError(t, err)
defer client.RemoveContainer(docker.RemoveContainerOptions{
ID: nonNomadContainer.ID,
Force: true,
must.NoError(t, err)
t.Cleanup(func() {
_ = dockerClient.RemoveContainer(docker.RemoveContainerOptions{
ID: nonNomadContainer.ID,
Force: true,
})
})
err = client.StartContainer(nonNomadContainer.ID, nil)
require.NoError(t, err)
err = dockerClient.StartContainer(nonNomadContainer.ID, nil)
must.NoError(t, err)
untrackedNomadContainer, err := client.CreateContainer(docker.CreateContainerOptions{
untrackedNomadContainer, err := dockerClient.CreateContainer(docker.CreateContainerOptions{
Name: "mytest-image-" + uuid.Generate(),
Config: &docker.Config{
Image: cfg.Image,
@ -95,45 +99,47 @@ func TestDanglingContainerRemoval(t *testing.T) {
},
},
})
require.NoError(t, err)
defer client.RemoveContainer(docker.RemoveContainerOptions{
ID: untrackedNomadContainer.ID,
Force: true,
must.NoError(t, err)
t.Cleanup(func() {
_ = dockerClient.RemoveContainer(docker.RemoveContainerOptions{
ID: untrackedNomadContainer.ID,
Force: true,
})
})
err = client.StartContainer(untrackedNomadContainer.ID, nil)
require.NoError(t, err)
err = dockerClient.StartContainer(untrackedNomadContainer.ID, nil)
must.NoError(t, err)
dd := d.Impl().(*Driver)
reconciler := newReconciler(dd)
trackedContainers := map[string]bool{handle.containerID: true}
trackedContainers := set.From([]string{handle.containerID})
tf := reconciler.trackedContainers()
require.Contains(t, tf, handle.containerID)
require.NotContains(t, tf, untrackedNomadContainer)
require.NotContains(t, tf, nonNomadContainer.ID)
tracked := reconciler.trackedContainers()
must.Contains[string](t, handle.containerID, tracked)
must.NotContains[string](t, untrackedNomadContainer.ID, tracked)
must.NotContains[string](t, nonNomadContainer.ID, tracked)
// assert tracked containers should never be untracked
untracked, err := reconciler.untrackedContainers(trackedContainers, time.Now())
require.NoError(t, err)
require.NotContains(t, untracked, handle.containerID)
require.NotContains(t, untracked, nonNomadContainer.ID)
require.Contains(t, untracked, untrackedNomadContainer.ID)
must.NoError(t, err)
must.NotContains[string](t, handle.containerID, untracked)
must.NotContains[string](t, nonNomadContainer.ID, untracked)
must.Contains[string](t, untrackedNomadContainer.ID, untracked)
// assert we recognize nomad containers with appropriate cutoff
untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now())
require.NoError(t, err)
require.Contains(t, untracked, handle.containerID)
require.Contains(t, untracked, untrackedNomadContainer.ID)
require.NotContains(t, untracked, nonNomadContainer.ID)
untracked, err = reconciler.untrackedContainers(set.New[string](0), time.Now())
must.NoError(t, err)
must.Contains[string](t, handle.containerID, untracked)
must.Contains[string](t, untrackedNomadContainer.ID, untracked)
must.NotContains[string](t, nonNomadContainer.ID, untracked)
// but ignore if creation happened before cutoff
untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now().Add(-1*time.Minute))
require.NoError(t, err)
require.NotContains(t, untracked, handle.containerID)
require.NotContains(t, untracked, untrackedNomadContainer.ID)
require.NotContains(t, untracked, nonNomadContainer.ID)
untracked, err = reconciler.untrackedContainers(set.New[string](0), time.Now().Add(-1*time.Minute))
must.NoError(t, err)
must.NotContains[string](t, handle.containerID, untracked)
must.NotContains[string](t, untrackedNomadContainer.ID, untracked)
must.NotContains[string](t, nonNomadContainer.ID, untracked)
// a full integration tests to assert that containers are removed
prestineDriver := dockerDriverHarness(t, nil).Impl().(*Driver)
@ -144,18 +150,54 @@ func TestDanglingContainerRemoval(t *testing.T) {
}
nReconciler := newReconciler(prestineDriver)
require.NoError(t, nReconciler.removeDanglingContainersIteration())
err = nReconciler.removeDanglingContainersIteration()
must.NoError(t, err)
_, err = client.InspectContainer(nonNomadContainer.ID)
require.NoError(t, err)
_, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: nonNomadContainer.ID})
must.NoError(t, err)
_, err = client.InspectContainer(handle.containerID)
require.Error(t, err)
require.Contains(t, err.Error(), NoSuchContainerError)
_, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: handle.containerID})
must.ErrorContains(t, err, NoSuchContainerError)
_, err = client.InspectContainer(untrackedNomadContainer.ID)
require.Error(t, err)
require.Contains(t, err.Error(), NoSuchContainerError)
_, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: untrackedNomadContainer.ID})
must.ErrorContains(t, err, NoSuchContainerError)
}
var (
dockerNetRe = regexp.MustCompile(`/var/run/docker/netns/[[:xdigit:]]`)
)
func TestDanglingContainerRemoval_network(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)
testutil.RequireLinux(t) // bridge implies linux
dd := dockerDriverHarness(t, nil).Impl().(*Driver)
reconciler := newReconciler(dd)
// create a pause container
allocID := uuid.Generate()
spec, created, err := dd.CreateNetwork(allocID, &drivers.NetworkCreateRequest{
Hostname: "hello",
})
must.NoError(t, err)
must.True(t, created)
must.RegexMatch(t, dockerNetRe, spec.Path)
id := spec.Labels[dockerNetSpecLabelKey]
// execute reconciliation
err = reconciler.removeDanglingContainersIteration()
must.NoError(t, err)
dockerClient := newTestDockerClient(t)
c, iErr := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: id})
must.NoError(t, iErr)
must.Eq(t, "running", c.State.Status)
fmt.Println("state", c.State)
// cleanup pause container
err = dd.DestroyNetwork(allocID, spec)
must.NoError(t, err)
}
// TestDanglingContainerRemoval_Stopped asserts stopped containers without
@ -166,8 +208,8 @@ func TestDanglingContainerRemoval_Stopped(t *testing.T) {
_, cfg, _ := dockerTask(t)
client := newTestDockerClient(t)
container, err := client.CreateContainer(docker.CreateContainerOptions{
dockerClient := newTestDockerClient(t)
container, err := dockerClient.CreateContainer(docker.CreateContainerOptions{
Name: "mytest-image-" + uuid.Generate(),
Config: &docker.Config{
Image: cfg.Image,
@ -177,33 +219,35 @@ func TestDanglingContainerRemoval_Stopped(t *testing.T) {
},
},
})
require.NoError(t, err)
defer client.RemoveContainer(docker.RemoveContainerOptions{
ID: container.ID,
Force: true,
must.NoError(t, err)
t.Cleanup(func() {
_ = dockerClient.RemoveContainer(docker.RemoveContainerOptions{
ID: container.ID,
Force: true,
})
})
err = client.StartContainer(container.ID, nil)
require.NoError(t, err)
err = dockerClient.StartContainer(container.ID, nil)
must.NoError(t, err)
err = client.StopContainer(container.ID, 60)
require.NoError(t, err)
err = dockerClient.StopContainer(container.ID, 60)
must.NoError(t, err)
dd := dockerDriverHarness(t, nil).Impl().(*Driver)
reconciler := newReconciler(dd)
// assert nomad container is tracked, and we ignore stopped one
tf := reconciler.trackedContainers()
require.NotContains(t, tf, container.ID)
tracked := reconciler.trackedContainers()
must.NotContains[string](t, container.ID, tracked)
untracked, err := reconciler.untrackedContainers(map[string]bool{}, time.Now())
require.NoError(t, err)
require.NotContains(t, untracked, container.ID)
untracked, err := reconciler.untrackedContainers(set.New[string](0), time.Now())
must.NoError(t, err)
must.NotContains[string](t, container.ID, untracked)
// if we start container again, it'll be marked as untracked
require.NoError(t, client.StartContainer(container.ID, nil))
must.NoError(t, dockerClient.StartContainer(container.ID, nil))
untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now())
require.NoError(t, err)
require.Contains(t, untracked, container.ID)
untracked, err = reconciler.untrackedContainers(set.New[string](0), time.Now())
must.NoError(t, err)
must.Contains[string](t, container.ID, untracked)
}

View File

@ -2,6 +2,8 @@ package docker
import (
"sync"
"github.com/hashicorp/go-set"
)
type taskStore struct {
@ -26,6 +28,17 @@ func (ts *taskStore) Get(id string) (*taskHandle, bool) {
return t, ok
}
func (ts *taskStore) IDs() *set.Set[string] {
ts.lock.RLock()
defer ts.lock.RUnlock()
s := set.New[string](len(ts.store))
for _, handle := range ts.store {
s.Insert(handle.containerID)
}
return s
}
func (ts *taskStore) Delete(id string) {
ts.lock.Lock()
defer ts.lock.Unlock()