client/driver: do accounting on layer pull progress

This commit is contained in:
Nick Ethier 2018-04-20 23:38:54 -04:00
parent 8efda7dc6c
commit 7c5821d7c6
No known key found for this signature in database
GPG key ID: 07C1A3ECED90D24A
5 changed files with 336 additions and 152 deletions

View file

@ -234,7 +234,8 @@ type DockerDriverConfig struct {
ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the containers root filesystem as read only
AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP
CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit.
ImagePullTimeout int64 `mapstructure:"image_pull_timeout"` // Timeout on the image pull after which the pull is cancelled
ImagePullTimeoutRaw string `mapstructure:"image_pull_timeout"` //
ImagePullTimeout time.Duration `mapstructure:"-"` // Timeout on the image pull after which the pull is cancelled
}
func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) {
@ -304,6 +305,12 @@ func (c *DockerDriverConfig) Validate() error {
return err
}
c.Ulimit = ulimit
if len(c.ImagePullTimeoutRaw) > 0 {
c.ImagePullTimeout, err = time.ParseDuration(c.ImagePullTimeoutRaw)
if err != nil {
return err
}
}
return nil
}
@ -738,7 +745,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
Type: fields.TypeBool,
},
"image_pull_timeout": {
Type: fields.TypeInt,
Type: fields.TypeString,
},
},
}
@ -1551,7 +1558,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
d.emitEvent("Downloading image %s:%s", repo, tag)
coordinator, callerID := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions, time.Duration(driverConfig.ImagePullTimeout)*time.Second, callerID, d.emitEvent)
return coordinator.PullImage(driverConfig.ImageName, authOptions, driverConfig.ImagePullTimeout, callerID, d.emitEvent)
}
// authBackend encapsulates a function that resolves registry credentials.

View file

@ -1,17 +1,13 @@
package driver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"regexp"
"sync"
"time"
"github.com/docker/docker/pkg/jsonmessage"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -27,14 +23,12 @@ var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)
// defaultPullActivityDeadline is the default value set in the imageProgressManager
// when newImageProgressManager is called
defaultPullActivityDeadline = 2 * time.Minute
// defaultImageProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
defaultImageProgressReportInterval = 10 * time.Second
const (
// dockerPullProgressEmitInterval is the interval at which the pull progress
// is emitted to the allocation
dockerPullProgressEmitInterval = 2 * time.Minute
)
// pullFuture is a sharable future for retrieving a pulled images ID and any
@ -147,132 +141,6 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
return globalCoordinator
}
type imageProgress struct {
sync.RWMutex
lastMessage *jsonmessage.JSONMessage
timestamp time.Time
}
func (p *imageProgress) get() (string, time.Time) {
p.RLock()
defer p.RUnlock()
if p.lastMessage == nil {
return "No progress", p.timestamp
}
var prefix string
if p.lastMessage.ID != "" {
prefix = fmt.Sprintf("%s:", p.lastMessage.ID)
}
if p.lastMessage.Progress == nil {
return fmt.Sprintf("%s%s", prefix, p.lastMessage.Status), p.timestamp
}
return fmt.Sprintf("%s%s %s", prefix, p.lastMessage.Status, p.lastMessage.Progress.String()), p.timestamp
}
func (p *imageProgress) set(msg *jsonmessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.lastMessage = msg
p.timestamp = time.Now()
}
type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time)
type imageProgressManager struct {
*imageProgress
image string
activityDeadline time.Duration
inactivityFunc progressReporterFunc
reportInterval time.Duration
reporter progressReporterFunc
cancel context.CancelFunc
stopCh chan struct{}
buf bytes.Buffer
pullStart time.Time
}
func newImageProgressManager(
image string, cancel context.CancelFunc,
inactivityFunc, reporter progressReporterFunc) *imageProgressManager {
return &imageProgressManager{
image: image,
activityDeadline: defaultPullActivityDeadline,
inactivityFunc: inactivityFunc,
reportInterval: defaultImageProgressReportInterval,
reporter: reporter,
imageProgress: &imageProgress{timestamp: time.Now()},
cancel: cancel,
stopCh: make(chan struct{}),
}
}
func (pm *imageProgressManager) withActivityDeadline(t time.Duration) *imageProgressManager {
pm.activityDeadline = t
return pm
}
func (pm *imageProgressManager) withReportInterval(t time.Duration) *imageProgressManager {
pm.reportInterval = t
return pm
}
func (pm *imageProgressManager) start() {
pm.pullStart = time.Now()
go func() {
ticker := time.NewTicker(defaultImageProgressReportInterval)
for {
select {
case <-ticker.C:
msg, timestamp := pm.get()
if time.Now().Sub(timestamp) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, timestamp, pm.pullStart)
pm.cancel()
return
}
pm.reporter(pm.image, msg, timestamp, pm.pullStart)
case <-pm.stopCh:
return
}
}
}()
}
func (pm *imageProgressManager) stop() {
close(pm.stopCh)
}
func (pm *imageProgressManager) Write(p []byte) (n int, err error) {
n, err = pm.buf.Write(p)
for {
line, err := pm.buf.ReadBytes('\n')
if err == io.EOF {
break
}
if err != nil {
return n, err
}
var msg jsonmessage.JSONMessage
err = json.Unmarshal(line, &msg)
if err != nil {
return n, err
}
if msg.Error != nil {
return n, msg.Error
}
pm.set(&msg)
}
return
}
// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string, emitFn LogEventFn) (imageID string, err error) {
@ -319,10 +187,10 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
tag = "latest"
}
ctx, cancel := context.WithCancel(context.Background())
if pullTimeout > 0 {
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(pullTimeout))
}
defer cancel()
if pullTimeout > 0 {
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(pullTimeout))
}
pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport)
pullOptions := docker.PullImageOptions{
@ -533,11 +401,9 @@ func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, p
}
func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) {
if timestamp.Sub(pullStart) > 10*time.Second {
d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg)
}
d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg)
if timestamp.Sub(pullStart) > 2*time.Minute {
if timestamp.Sub(pullStart) > dockerPullProgressEmitInterval {
d.emitEvent(image, "Docker image %s pull progress: %s", image, msg)
}
}

View file

@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate())
id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate(), nil)
}()
}
@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
callerIDs[i] = uuid.Generate()
id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i])
id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i], nil)
}
// Check the reference count
@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
callerID := uuid.Generate()
// Pull image
id, _ := coordinator.PullImage(image, nil, 0, callerID)
id, _ := coordinator.PullImage(image, nil, 0, callerID, nil)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
}
// Pull image again within delay
id, _ = coordinator.PullImage(image, nil, 0, callerID)
id, _ = coordinator.PullImage(image, nil, 0, callerID, nil)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {
callerID := uuid.Generate()
// Pull image
id, _ := coordinator.PullImage(image, nil, 0, callerID)
id, _ := coordinator.PullImage(image, nil, 0, callerID, nil)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {

View file

@ -0,0 +1,259 @@
package driver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"sync"
"time"
"github.com/docker/docker/pkg/jsonmessage"
units "github.com/docker/go-units"
)
const (
// defaultPullActivityDeadline is the default value set in the imageProgressManager
// when newImageProgressManager is called
defaultPullActivityDeadline = 2 * time.Minute
// defaultImageProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
defaultImageProgressReportInterval = 10 * time.Second
)
// layerProgress tracks the state and downloaded bytes of a single layer within
// a docker image
type layerProgress struct {
id string
status layerProgressStatus
currentBytes int64
totalBytes int64
}
type layerProgressStatus int
const (
layerProgressStatusUnknown layerProgressStatus = iota
layerProgressStatusStarting
layerProgressStatusWaiting
layerProgressStatusDownloading
layerProgressStatusVerifying
layerProgressStatusDownloaded
layerProgressStatusExtracting
layerProgressStatusComplete
layerProgressStatusExists
)
func lpsFromString(status string) layerProgressStatus {
switch status {
case "Pulling fs layer":
return layerProgressStatusStarting
case "Waiting":
return layerProgressStatusWaiting
case "Downloading":
return layerProgressStatusDownloading
case "Verifying Checksum":
return layerProgressStatusVerifying
case "Download complete":
return layerProgressStatusDownloaded
case "Extracting":
return layerProgressStatusExtracting
case "Pull complete":
return layerProgressStatusComplete
case "Already exists":
return layerProgressStatusExists
default:
return layerProgressStatusUnknown
}
}
// imageProgress tracks the status of each child layer as its pulled from a
// docker image repo
type imageProgress struct {
sync.RWMutex
lastMessage *jsonmessage.JSONMessage
timestamp time.Time
layers map[string]*layerProgress
pullStart time.Time
}
// get returns a status message and the timestamp of the last status update
func (p *imageProgress) get() (string, time.Time) {
p.RLock()
defer p.RUnlock()
if p.lastMessage == nil {
return "No progress", p.timestamp
}
var pulled, pulling int
for _, l := range p.layers {
if l.status == layerProgressStatusDownloading {
pulling++
} else if l.status > layerProgressStatusVerifying {
pulled++
}
}
elapsed := time.Now().Sub(p.pullStart)
cur := p.currentBytes()
total := p.totalBytes()
var est int64
if cur != 0 {
est = (elapsed.Nanoseconds() / cur * total) - elapsed.Nanoseconds()
}
return fmt.Sprintf("Pulled %d/%d (%s/%s) pulling %d layers - est %.1fs remaining",
pulled, len(p.layers), units.BytesSize(float64(cur)), units.BytesSize(float64(total)), pulling,
time.Duration(est).Seconds()), p.timestamp
}
// set takes a status message received from the docker engine api during an image
// pull and updates the status of the coorisponding layer
func (p *imageProgress) set(msg *jsonmessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.lastMessage = msg
p.timestamp = time.Now()
lps := lpsFromString(msg.Status)
if lps == layerProgressStatusUnknown {
return
}
layer, ok := p.layers[msg.ID]
if !ok {
layer = &layerProgress{id: msg.ID}
p.layers[msg.ID] = layer
}
layer.status = lps
if msg.Progress != nil && lps == layerProgressStatusDownloading {
layer.currentBytes = msg.Progress.Current
layer.totalBytes = msg.Progress.Total
} else if lps == layerProgressStatusDownloaded {
layer.currentBytes = layer.totalBytes
}
}
// currentBytes iterates through all image layers and sums the total of
// current bytes. The caller is responsible for acquiring a read lock on the
// imageProgress struct
func (p *imageProgress) currentBytes() int64 {
var b int64
for _, l := range p.layers {
b += l.currentBytes
}
return b
}
// totalBytes iterates through all image layers and sums the total of
// total bytes. The caller is responsible for acquiring a read lock on the
// imageProgress struct
func (p *imageProgress) totalBytes() int64 {
var b int64
for _, l := range p.layers {
b += l.totalBytes
}
return b
}
// progressReporterFunc defines the method for handeling inactivity and report
// events from the imageProgressManager. The image name, current status message,
// timestamp of last received status update and timestamp of when the pull started
// are passed in.
type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time)
// imageProgressManager tracks the progress of pulling a docker image from an
// image repository.
// It also implemented the io.Writer interface so as to be passed to the docker
// client pull image method in order to receive status updates from the docker
// engine api.
type imageProgressManager struct {
imageProgress *imageProgress
image string
activityDeadline time.Duration
inactivityFunc progressReporterFunc
reportInterval time.Duration
reporter progressReporterFunc
cancel context.CancelFunc
stopCh chan struct{}
buf bytes.Buffer
}
func newImageProgressManager(
image string, cancel context.CancelFunc,
inactivityFunc, reporter progressReporterFunc) *imageProgressManager {
return &imageProgressManager{
image: image,
activityDeadline: defaultPullActivityDeadline,
inactivityFunc: inactivityFunc,
reportInterval: defaultImageProgressReportInterval,
reporter: reporter,
imageProgress: &imageProgress{
timestamp: time.Now(),
layers: make(map[string]*layerProgress),
},
cancel: cancel,
stopCh: make(chan struct{}),
}
}
// start intiates the ticker to trigger the inactivity and reporter handlers
func (pm *imageProgressManager) start() {
pm.imageProgress.pullStart = time.Now()
go func() {
ticker := time.NewTicker(defaultImageProgressReportInterval)
for {
select {
case <-ticker.C:
msg, timestamp := pm.imageProgress.get()
if time.Now().Sub(timestamp) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart)
pm.cancel()
return
}
pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart)
case <-pm.stopCh:
return
}
}
}()
}
func (pm *imageProgressManager) stop() {
close(pm.stopCh)
}
func (pm *imageProgressManager) Write(p []byte) (n int, err error) {
n, err = pm.buf.Write(p)
var msg jsonmessage.JSONMessage
for {
line, err := pm.buf.ReadBytes('\n')
if err == io.EOF {
// Partial write of line; push back onto buffer and break until full line
pm.buf.Write(line)
break
}
if err != nil {
return n, err
}
err = json.Unmarshal(line, &msg)
if err != nil {
return n, err
}
if msg.Error != nil {
// error received from the docker engine api
return n, msg.Error
}
pm.imageProgress.set(&msg)
}
return
}

View file

@ -0,0 +1,52 @@
package driver
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func Test_DockerImageProgressManager(t *testing.T) {
pm := &imageProgressManager{
imageProgress: &imageProgress{
timestamp: time.Now(),
layers: make(map[string]*layerProgress),
},
}
_, err := pm.Write([]byte(`{"status":"Pulling from library/golang","id":"1.9.5"}
{"status":"Pulling fs layer","progressDetail":{},"id":"c73ab1c6897b"}
{"status":"Pulling fs layer","progressDetail":{},"id":"1ab373b3deae"}
`))
require.NoError(t, err)
require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2")
cur := pm.imageProgress.currentBytes()
require.Zero(t, cur)
tot := pm.imageProgress.totalBytes()
require.Zero(t, tot)
_, err = pm.Write([]byte(`{"status":"Pulling fs layer","progress`))
require.NoError(t, err)
require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2")
_, err = pm.Write([]byte(`Detail":{},"id":"b542772b4177"}` + "\n"))
require.NoError(t, err)
require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3")
_, err = pm.Write([]byte(`{"status":"Downloading","progressDetail":{"current":45800,"total":4335495},"progress":"[\u003e ] 45.8kB/4.335MB","id":"b542772b4177"}
{"status":"Downloading","progressDetail":{"current":113576,"total":11108010},"progress":"[\u003e ] 113.6kB/11.11MB","id":"1ab373b3deae"}
{"status":"Downloading","progressDetail":{"current":694257,"total":4335495},"progress":"[========\u003e ] 694.3kB/4.335MB","id":"b542772b4177"}` + "\n"))
require.NoError(t, err)
require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3")
require.Equal(t, int64(807833), pm.imageProgress.currentBytes())
require.Equal(t, int64(15443505), pm.imageProgress.totalBytes())
_, err = pm.Write([]byte(`{"status":"Download complete","progressDetail":{},"id":"b542772b4177"}` + "\n"))
require.NoError(t, err)
require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3")
require.Equal(t, int64(4449071), pm.imageProgress.currentBytes())
require.Equal(t, int64(15443505), pm.imageProgress.totalBytes())
}