client/driver: add image pull progress monitoring

This commit is contained in:
Nick Ethier 2018-04-19 13:56:24 -04:00
parent 4774a16bcd
commit e35948ab91
No known key found for this signature in database
GPG key ID: 07C1A3ECED90D24A
3 changed files with 193 additions and 11 deletions

View file

@ -234,6 +234,7 @@ type DockerDriverConfig struct {
ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the containers root filesystem as read only
AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP
CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit.
ImagePullTimeout int64 `mapstructure:"image_pull_timeout"` // Timeout on the image pull after which the pull is cancelled
}
func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) {
@ -736,6 +737,9 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
"cpu_hard_limit": {
Type: fields.TypeBool,
},
"image_pull_timeout": {
Type: fields.TypeInt,
},
},
}
@ -765,6 +769,7 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoord
cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault),
logger: d.logger,
removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault),
emitEvent: d.emitEvent,
}
return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName)
@ -1546,7 +1551,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
d.emitEvent("Downloading image %s:%s", repo, tag)
coordinator, callerID := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID)
return coordinator.PullImage(driverConfig.ImageName, authOptions, time.Duration(driverConfig.ImagePullTimeout)*time.Second, callerID)
}
// authBackend encapsulates a function that resolves registry credentials.

View file

@ -1,13 +1,17 @@
package driver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"regexp"
"sync"
"time"
"github.com/docker/docker/pkg/jsonmessage"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -23,6 +27,14 @@ var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
// defaultPullActivityDeadline is the default value set in the imageProgressManager
// when newImageProgressManager is called
defaultPullActivityDeadline = 2 * time.Minute
// defaultImageProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
defaultImageProgressReportInterval = 10 * time.Second
)
// pullFuture is a sharable future for retrieving a pulled images ID and any
@ -84,6 +96,9 @@ type dockerCoordinatorConfig struct {
// removeDelay is the delay between an image's reference count going to
// zero and the image actually being deleted.
removeDelay time.Duration
//emitEvent us the function used to emit an event to a task
emitEvent LogEventFn
}
// dockerCoordinator is used to coordinate actions against images to prevent
@ -128,9 +143,135 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
return globalCoordinator
}
type imageProgress struct {
sync.RWMutex
lastMessage *jsonmessage.JSONMessage
timestamp time.Time
}
func (p *imageProgress) get() (string, time.Time) {
p.RLock()
defer p.RUnlock()
if p.lastMessage == nil {
return "No progress", p.timestamp
}
var prefix string
if p.lastMessage.ID != "" {
prefix = fmt.Sprintf("%s:", p.lastMessage.ID)
}
if p.lastMessage.Progress == nil {
return fmt.Sprintf("%s%s", prefix, p.lastMessage.Status), p.timestamp
}
return fmt.Sprintf("%s%s %s", prefix, p.lastMessage.Status, p.lastMessage.Progress.String()), p.timestamp
}
func (p *imageProgress) set(msg *jsonmessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.lastMessage = msg
p.timestamp = time.Now()
}
type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time)
type imageProgressManager struct {
*imageProgress
image string
activityDeadline time.Duration
inactivityFunc progressReporterFunc
reportInterval time.Duration
reporter progressReporterFunc
cancel context.CancelFunc
stopCh chan struct{}
buf bytes.Buffer
pullStart time.Time
}
func newImageProgressManager(
image string, cancel context.CancelFunc,
inactivityFunc, reporter progressReporterFunc) *imageProgressManager {
return &imageProgressManager{
image: image,
activityDeadline: defaultPullActivityDeadline,
inactivityFunc: inactivityFunc,
reportInterval: defaultImageProgressReportInterval,
reporter: reporter,
imageProgress: &imageProgress{timestamp: time.Now()},
cancel: cancel,
stopCh: make(chan struct{}),
}
}
func (pm *imageProgressManager) withActivityDeadline(t time.Duration) *imageProgressManager {
pm.activityDeadline = t
return pm
}
func (pm *imageProgressManager) withReportInterval(t time.Duration) *imageProgressManager {
pm.reportInterval = t
return pm
}
func (pm *imageProgressManager) start() {
pm.pullStart = time.Now()
go func() {
ticker := time.NewTicker(defaultImageProgressReportInterval)
for {
select {
case <-ticker.C:
msg, timestamp := pm.get()
if time.Now().Sub(timestamp) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, timestamp, pm.pullStart)
pm.cancel()
return
}
pm.reporter(pm.image, msg, timestamp, pm.pullStart)
case <-pm.stopCh:
return
}
}
}()
}
func (pm *imageProgressManager) stop() {
close(pm.stopCh)
}
func (pm *imageProgressManager) Write(p []byte) (n int, err error) {
n, err = pm.buf.Write(p)
for {
line, err := pm.buf.ReadBytes('\n')
if err == io.EOF {
break
}
if err != nil {
return n, err
}
var msg jsonmessage.JSONMessage
err = json.Unmarshal(line, &msg)
if err != nil {
return n, err
}
if msg.Error != nil {
return n, msg.Error
}
pm.set(&msg)
}
return
}
// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) {
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
@ -138,7 +279,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf
// Make the future
future = newPullFuture()
d.pullFutures[image] = future
go d.pullImageImpl(image, authOptions, future)
go d.pullImageImpl(image, authOptions, pullTimeout, future)
}
d.imageLock.Unlock()
@ -165,15 +306,25 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf
// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) {
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, future *pullFuture) {
// Parse the repo and tag
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}
ctx, cancel := context.WithCancel(context.Background())
if pullTimeout > 0 {
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(pullTimeout))
}
defer cancel()
pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport)
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
OutputStream: pm,
RawJSONStream: true,
Context: ctx,
}
// Attempt to pull the image
@ -181,7 +332,17 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
if authOptions != nil {
auth = *authOptions
}
pm.start()
defer pm.stop()
err := d.client.PullImage(pullOptions, auth)
if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
d.logger.Printf("[ERR] driver.docker: timeout pulling container %s:%s", repo, tag)
future.set("", recoverablePullError(ctxErr, image))
return
}
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
future.set("", recoverablePullError(err, image))
@ -337,6 +498,21 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
d.imageLock.Unlock()
}
func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time) {
d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg)
}
func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) {
if timestamp.Sub(pullStart) > 10*time.Second {
d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg)
}
if timestamp.Sub(pullStart) > 2*time.Minute {
d.emitEvent("Docker image %s pull progress: %s", image, msg)
}
}
// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func recoverablePullError(err error, image string) error {

View file

@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil, uuid.Generate())
id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate())
}()
}
@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
callerIDs[i] = uuid.Generate()
id, _ = coordinator.PullImage(image, nil, callerIDs[i])
id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i])
}
// Check the reference count
@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
callerID := uuid.Generate()
// Pull image
id, _ := coordinator.PullImage(image, nil, callerID)
id, _ := coordinator.PullImage(image, nil, 0, callerID)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
}
// Pull image again within delay
id, _ = coordinator.PullImage(image, nil, callerID)
id, _ = coordinator.PullImage(image, nil, 0, callerID)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {
callerID := uuid.Generate()
// Pull image
id, _ := coordinator.PullImage(image, nil, callerID)
id, _ := coordinator.PullImage(image, nil, 0, callerID)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {