client/driver: add seperate handler for emitting pull progress

This commit is contained in:
Nick Ethier 2018-05-03 14:39:03 -04:00
parent 0bdd976b7d
commit 77af17efbc
No known key found for this signature in database
GPG Key ID: 07C1A3ECED90D24A
2 changed files with 36 additions and 27 deletions

View File

@ -191,7 +191,8 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport)
pm := newImageProgressManager(image, cancel, d.handlePullInactivity,
d.handlePullProgressReport, d.handleSlowPullProgressReport)
defer pm.stop()
pullOptions := docker.PullImageOptions{
@ -394,16 +395,16 @@ func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{}
}
}
func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time, interval int64) {
func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) {
d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg)
}
func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time, interval int64) {
func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) {
d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg)
}
if interval%int64(dockerPullProgressEmitInterval.Seconds()/dockerImageProgressReportInterval.Seconds()) == 0 {
d.emitEvent(image, "Docker image %s pull progress: %s", image, msg)
}
func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) {
d.emitEvent(image, "Docker image %s pull progress: %s", image, msg)
}
// recoverablePullError wraps the error gotten when trying to pull and image if

View File

@ -161,10 +161,9 @@ func (p *imageProgress) totalBytes() int64 {
}
// progressReporterFunc defines the method for handeling inactivity and report
// events from the imageProgressManager. The image name, current status message,
// timestamp of last received status update, timestamp of when the pull started
// and current report interation are passed in.
type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time, interval int64)
// events from the imageProgressManager. The image name, current status message
// and timestamp of last received status update are passed in.
type progressReporterFunc func(image string, msg string, timestamp time.Time)
// imageProgressManager tracks the progress of pulling a docker image from an
// image repository.
@ -172,20 +171,23 @@ type progressReporterFunc func(image string, msg string, timestamp time.Time, pu
// client pull image method in order to receive status updates from the docker
// engine api.
type imageProgressManager struct {
imageProgress *imageProgress
image string
activityDeadline time.Duration
inactivityFunc progressReporterFunc
reportInterval time.Duration
reporter progressReporterFunc
cancel context.CancelFunc
stopCh chan struct{}
buf bytes.Buffer
imageProgress *imageProgress
image string
activityDeadline time.Duration
inactivityFunc progressReporterFunc
reportInterval time.Duration
reporter progressReporterFunc
slowReportInterval time.Duration
slowReporter progressReporterFunc
lastSlowReport time.Time
cancel context.CancelFunc
stopCh chan struct{}
buf bytes.Buffer
}
func newImageProgressManager(
image string, cancel context.CancelFunc,
inactivityFunc, reporter progressReporterFunc) *imageProgressManager {
inactivityFunc, reporter, slowReporter progressReporterFunc) *imageProgressManager {
pm := &imageProgressManager{
image: image,
@ -193,6 +195,7 @@ func newImageProgressManager(
inactivityFunc: inactivityFunc,
reportInterval: dockerImageProgressReportInterval,
reporter: reporter,
slowReporter: slowReporter,
imageProgress: &imageProgress{
timestamp: time.Now(),
layers: make(map[string]*layerProgress),
@ -207,21 +210,26 @@ func newImageProgressManager(
// start intiates the ticker to trigger the inactivity and reporter handlers
func (pm *imageProgressManager) start() {
pm.imageProgress.pullStart = time.Now()
now := time.Now()
pm.imageProgress.pullStart = now
pm.lastSlowReport = now
go func() {
ticker := time.NewTicker(dockerImageProgressReportInterval)
var interval int64
for {
interval++
select {
case <-ticker.C:
msg, timestamp := pm.imageProgress.get()
if time.Now().Sub(timestamp) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval)
msg, lastStatusTime := pm.imageProgress.get()
t := time.Now()
if t.Sub(lastStatusTime) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, lastStatusTime)
pm.cancel()
return
}
pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval)
if t.Sub(pm.lastSlowReport) > pm.slowReportInterval {
pm.slowReporter(pm.image, msg, lastStatusTime)
pm.lastSlowReport = t
}
pm.reporter(pm.image, msg, lastStatusTime)
case <-pm.stopCh:
return
}