open-nomad/drivers/docker/progress.go
2023-04-10 15:36:59 +00:00

289 lines
7.5 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package docker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/docker/docker/pkg/jsonmessage"
units "github.com/docker/go-units"
)
const (
// dockerImageProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
dockerImageProgressReportInterval = 10 * time.Second
// dockerImageSlowProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
dockerImageSlowProgressReportInterval = 2 * time.Minute
)
// layerProgress tracks the state and downloaded bytes of a single layer within
// a docker image
type layerProgress struct {
id string
status layerProgressStatus
currentBytes int64
totalBytes int64
}
type layerProgressStatus int
const (
layerProgressStatusUnknown layerProgressStatus = iota
layerProgressStatusStarting
layerProgressStatusWaiting
layerProgressStatusDownloading
layerProgressStatusVerifying
layerProgressStatusDownloaded
layerProgressStatusExtracting
layerProgressStatusComplete
layerProgressStatusExists
)
func lpsFromString(status string) layerProgressStatus {
switch status {
case "Pulling fs layer":
return layerProgressStatusStarting
case "Waiting":
return layerProgressStatusWaiting
case "Downloading":
return layerProgressStatusDownloading
case "Verifying Checksum":
return layerProgressStatusVerifying
case "Download complete":
return layerProgressStatusDownloaded
case "Extracting":
return layerProgressStatusExtracting
case "Pull complete":
return layerProgressStatusComplete
case "Already exists":
return layerProgressStatusExists
default:
return layerProgressStatusUnknown
}
}
// imageProgress tracks the status of each child layer as its pulled from a
// docker image repo
type imageProgress struct {
sync.RWMutex
lastMessage *jsonmessage.JSONMessage
timestamp time.Time
layers map[string]*layerProgress
pullStart time.Time
}
// get returns a status message and the timestamp of the last status update
func (p *imageProgress) get() (string, time.Time) {
p.RLock()
defer p.RUnlock()
if p.lastMessage == nil {
return "No progress", p.timestamp
}
var pulled, pulling, waiting int
for _, l := range p.layers {
switch {
case l.status == layerProgressStatusStarting ||
l.status == layerProgressStatusWaiting:
waiting++
case l.status == layerProgressStatusDownloading ||
l.status == layerProgressStatusVerifying:
pulling++
case l.status >= layerProgressStatusDownloaded:
pulled++
}
}
elapsed := time.Since(p.pullStart)
cur := p.currentBytes()
total := p.totalBytes()
var est int64
if cur != 0 {
est = (elapsed.Nanoseconds() / cur * total) - elapsed.Nanoseconds()
}
var msg strings.Builder
fmt.Fprintf(&msg, "Pulled %d/%d (%s/%s) layers: %d waiting/%d pulling",
pulled, len(p.layers), units.BytesSize(float64(cur)), units.BytesSize(float64(total)),
waiting, pulling)
if est > 0 {
fmt.Fprintf(&msg, " - est %.1fs remaining", time.Duration(est).Seconds())
}
return msg.String(), p.timestamp
}
// set takes a status message received from the docker engine api during an image
// pull and updates the status of the corresponding layer
func (p *imageProgress) set(msg *jsonmessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.lastMessage = msg
p.timestamp = time.Now()
lps := lpsFromString(msg.Status)
if lps == layerProgressStatusUnknown {
return
}
layer, ok := p.layers[msg.ID]
if !ok {
layer = &layerProgress{id: msg.ID}
p.layers[msg.ID] = layer
}
layer.status = lps
if msg.Progress != nil && lps == layerProgressStatusDownloading {
layer.currentBytes = msg.Progress.Current
layer.totalBytes = msg.Progress.Total
} else if lps == layerProgressStatusDownloaded {
layer.currentBytes = layer.totalBytes
}
}
// currentBytes iterates through all image layers and sums the total of
// current bytes. The caller is responsible for acquiring a read lock on the
// imageProgress struct
func (p *imageProgress) currentBytes() int64 {
var b int64
for _, l := range p.layers {
b += l.currentBytes
}
return b
}
// totalBytes iterates through all image layers and sums the total of
// total bytes. The caller is responsible for acquiring a read lock on the
// imageProgress struct
func (p *imageProgress) totalBytes() int64 {
var b int64
for _, l := range p.layers {
b += l.totalBytes
}
return b
}
// progressReporterFunc defines the method for handling inactivity and report
// events from the imageProgressManager. The image name, current status message
// and timestamp of last received status update are passed in.
type progressReporterFunc func(image string, msg string, timestamp time.Time)
// imageProgressManager tracks the progress of pulling a docker image from an
// image repository.
// It also implemented the io.Writer interface so as to be passed to the docker
// client pull image method in order to receive status updates from the docker
// engine api.
type imageProgressManager struct {
imageProgress *imageProgress
image string
activityDeadline time.Duration
inactivityFunc progressReporterFunc
reportInterval time.Duration
reporter progressReporterFunc
slowReportInterval time.Duration
slowReporter progressReporterFunc
lastSlowReport time.Time
cancel context.CancelFunc
stopCh chan struct{}
buf bytes.Buffer
}
func newImageProgressManager(
image string, cancel context.CancelFunc,
pullActivityTimeout time.Duration, inactivityFunc, reporter, slowReporter progressReporterFunc) *imageProgressManager {
pm := &imageProgressManager{
image: image,
activityDeadline: pullActivityTimeout,
inactivityFunc: inactivityFunc,
reportInterval: dockerImageProgressReportInterval,
reporter: reporter,
slowReportInterval: dockerImageSlowProgressReportInterval,
slowReporter: slowReporter,
imageProgress: &imageProgress{
timestamp: time.Now(),
layers: make(map[string]*layerProgress),
},
cancel: cancel,
stopCh: make(chan struct{}),
}
pm.start()
return pm
}
// start intiates the ticker to trigger the inactivity and reporter handlers
func (pm *imageProgressManager) start() {
now := time.Now()
pm.imageProgress.pullStart = now
pm.lastSlowReport = now
go func() {
ticker := time.NewTicker(dockerImageProgressReportInterval)
for {
select {
case <-ticker.C:
msg, lastStatusTime := pm.imageProgress.get()
t := time.Now()
if t.Sub(lastStatusTime) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, lastStatusTime)
pm.cancel()
return
}
if t.Sub(pm.lastSlowReport) > pm.slowReportInterval {
pm.slowReporter(pm.image, msg, lastStatusTime)
pm.lastSlowReport = t
}
pm.reporter(pm.image, msg, lastStatusTime)
case <-pm.stopCh:
return
}
}
}()
}
func (pm *imageProgressManager) stop() {
close(pm.stopCh)
}
func (pm *imageProgressManager) Write(p []byte) (n int, err error) {
n, err = pm.buf.Write(p)
var msg jsonmessage.JSONMessage
for {
line, err := pm.buf.ReadBytes('\n')
if err == io.EOF {
// Partial write of line; push back onto buffer and break until full line
pm.buf.Write(line)
break
}
if err != nil {
return n, err
}
err = json.Unmarshal(line, &msg)
if err != nil {
return n, err
}
if msg.Error != nil {
// error received from the docker engine api
return n, msg.Error
}
pm.imageProgress.set(&msg)
}
return
}