Merge pull request #5510 from hashicorp/b-docker-stats

docker: fix send after close panic in stats
This commit is contained in:
Michael Schurter 2019-04-02 10:01:28 -07:00 committed by GitHub
commit 77f99fc49c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 16 deletions

View File

@ -4,9 +4,11 @@ import (
"context"
"fmt"
"io"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/client/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/docker/util"
nstructs "github.com/hashicorp/nomad/nomad/structs"
@ -22,6 +24,55 @@ const (
statsCollectorBackoffLimit = 2 * time.Minute
)
// usageSender wraps a TaskResourceUsage chan such that it supports concurrent
// sending and closing, and backpressures by dropping events if necessary.
type usageSender struct {
closed bool
destCh chan<- *structs.TaskResourceUsage
mu sync.Mutex
}
// newStatsChanPipe returns a chan wrapped in a struct that supports concurrent
// sending and closing, and the receiver end of the chan.
func newStatsChanPipe() (*usageSender, <-chan *structs.TaskResourceUsage) {
destCh := make(chan *cstructs.TaskResourceUsage, 1)
return &usageSender{
destCh: destCh,
}, destCh
}
// send resource usage to the receiver unless the chan is already full or
// closed.
func (u *usageSender) send(tru *cstructs.TaskResourceUsage) {
u.mu.Lock()
defer u.mu.Unlock()
if u.closed {
return
}
select {
case u.destCh <- tru:
default:
// Backpressure caused missed interval
}
}
// close resource usage. Any further sends will be dropped.
func (u *usageSender) close() {
u.mu.Lock()
defer u.mu.Unlock()
if u.closed {
// already closed
return
}
u.closed = true
close(u.destCh)
}
// Stats starts collecting stats from the docker daemon and sends them on the
// returned channel.
func (h *taskHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
@ -30,14 +81,16 @@ func (h *taskHandle) Stats(ctx context.Context, interval time.Duration) (<-chan
return nil, nstructs.NewRecoverableError(fmt.Errorf("container stopped"), false)
default:
}
ch := make(chan *cstructs.TaskResourceUsage, 1)
go h.collectStats(ctx, ch, interval)
return ch, nil
destCh, recvCh := newStatsChanPipe()
go h.collectStats(ctx, destCh, interval)
return recvCh, nil
}
// collectStats starts collecting resource usage stats of a docker container
func (h *taskHandle) collectStats(ctx context.Context, ch chan *cstructs.TaskResourceUsage, interval time.Duration) {
defer close(ch)
func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration) {
defer destCh.close()
// backoff and retry used if the docker stats API returns an error
var backoff time.Duration
var retry int
@ -56,7 +109,7 @@ func (h *taskHandle) collectStats(ctx context.Context, ch chan *cstructs.TaskRes
// receive stats from docker and emit nomad stats
// statsCh will always be closed by docker client.
statsCh := make(chan *docker.Stats)
go dockerStatsCollector(ch, statsCh, interval)
go dockerStatsCollector(destCh, statsCh, interval)
statsOpts := docker.StatsOptions{
ID: h.containerID,
@ -86,7 +139,7 @@ func (h *taskHandle) collectStats(ctx context.Context, ch chan *cstructs.TaskRes
}
}
func dockerStatsCollector(destCh chan *cstructs.TaskResourceUsage, statsCh <-chan *docker.Stats, interval time.Duration) {
func dockerStatsCollector(destCh *usageSender, statsCh <-chan *docker.Stats, interval time.Duration) {
var resourceUsage *cstructs.TaskResourceUsage
// hasSentInitialStats is used so as to emit the first stats received from
@ -103,13 +156,12 @@ func dockerStatsCollector(destCh chan *cstructs.TaskResourceUsage, statsCh <-cha
if resourceUsage == nil {
continue
}
// sending to destCh could block, drop this interval if it does
select {
case destCh <- resourceUsage:
default:
// Backpressure caused missed interval
}
destCh.send(resourceUsage)
timer.Reset(interval)
case s, ok := <-statsCh:
// if statsCh is closed stop collection
if !ok {

View File

@ -2,20 +2,23 @@ package docker
import (
"runtime"
"sync"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/plugins/drivers"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/stretchr/testify/require"
)
func TestDriver_DockerStatsCollector(t *testing.T) {
t.Parallel()
require := require.New(t)
src := make(chan *docker.Stats)
defer close(src)
dst := make(chan *drivers.TaskResourceUsage)
defer close(dst)
dst, recvCh := newStatsChanPipe()
defer dst.close()
stats := &docker.Stats{}
stats.CPUStats.ThrottlingData.Periods = 10
stats.CPUStats.ThrottlingData.ThrottledPeriods = 10
@ -39,7 +42,7 @@ func TestDriver_DockerStatsCollector(t *testing.T) {
}
select {
case ru := <-dst:
case ru := <-recvCh:
if runtime.GOOS != "windows" {
require.Equal(stats.MemoryStats.Stats.Rss, ru.ResourceUsage.MemoryStats.RSS)
require.Equal(stats.MemoryStats.Stats.Cache, ru.ResourceUsage.MemoryStats.Cache)
@ -60,3 +63,66 @@ func TestDriver_DockerStatsCollector(t *testing.T) {
require.Fail("receiving stats should not block here")
}
}
// TestDriver_DockerUsageSender asserts that the TaskResourceUsage chan wrapper
// supports closing and sending on a chan from concurrent goroutines.
func TestDriver_DockerUsageSender(t *testing.T) {
t.Parallel()
// sample payload
res := &cstructs.TaskResourceUsage{}
destCh, recvCh := newStatsChanPipe()
// Sending should never fail
destCh.send(res)
destCh.send(res)
destCh.send(res)
// Clear chan
<-recvCh
// Send and close concurrently to let the race detector help us out
wg := sync.WaitGroup{}
wg.Add(3)
// Sender
go func() {
destCh.send(res)
wg.Done()
}()
// Closer
go func() {
destCh.close()
wg.Done()
}()
// Clear recv chan
go func() {
for range recvCh {
}
wg.Done()
}()
wg.Wait()
// Assert closed
destCh.mu.Lock()
closed := destCh.closed
destCh.mu.Unlock()
require.True(t, closed)
select {
case _, ok := <-recvCh:
require.False(t, ok)
default:
require.Fail(t, "expect recvCh to be closed")
}
// Assert sending and closing never fails
destCh.send(res)
destCh.close()
destCh.close()
destCh.send(res)
}