9fea54e0dc
plugins/driver: update driver interface to support streaming stats client/tr: use streaming stats api TODO: * how to handle errors and closed channel during stats streaming * prevent tight loop if Stats(ctx) returns an error drivers: update drivers TaskStats RPC to handle streaming results executor: better error handling in stats rpc docker: better control and error handling of stats rpc driver: allow stats to return a recoverable error
213 lines
5.7 KiB
Go
213 lines
5.7 KiB
Go
package docker
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/armon/circbuf"
|
|
docker "github.com/fsouza/go-dockerclient"
|
|
hclog "github.com/hashicorp/go-hclog"
|
|
plugin "github.com/hashicorp/go-plugin"
|
|
"github.com/hashicorp/nomad/drivers/docker/docklog"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/hashicorp/nomad/plugins/shared"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
type taskHandle struct {
|
|
client *docker.Client
|
|
waitClient *docker.Client
|
|
logger hclog.Logger
|
|
dlogger docklog.DockerLogger
|
|
dloggerPluginClient *plugin.Client
|
|
task *drivers.TaskConfig
|
|
containerID string
|
|
containerImage string
|
|
resourceUsageLock sync.RWMutex
|
|
resourceUsage *drivers.TaskResourceUsage
|
|
doneCh chan bool
|
|
waitCh chan struct{}
|
|
removeContainerOnExit bool
|
|
net *drivers.DriverNetwork
|
|
|
|
exitResult *drivers.ExitResult
|
|
exitResultLock sync.Mutex
|
|
}
|
|
|
|
func (h *taskHandle) ExitResult() *drivers.ExitResult {
|
|
h.exitResultLock.Lock()
|
|
defer h.exitResultLock.Unlock()
|
|
return h.exitResult.Copy()
|
|
}
|
|
|
|
type taskHandleState struct {
|
|
// ReattachConfig for the docker logger plugin
|
|
ReattachConfig *shared.ReattachConfig
|
|
|
|
ContainerID string
|
|
DriverNetwork *drivers.DriverNetwork
|
|
}
|
|
|
|
func (h *taskHandle) buildState() *taskHandleState {
|
|
return &taskHandleState{
|
|
ReattachConfig: shared.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig()),
|
|
ContainerID: h.containerID,
|
|
DriverNetwork: h.net,
|
|
}
|
|
}
|
|
|
|
func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*drivers.ExecTaskResult, error) {
|
|
fullCmd := make([]string, len(args)+1)
|
|
fullCmd[0] = cmd
|
|
copy(fullCmd[1:], args)
|
|
createExecOpts := docker.CreateExecOptions{
|
|
AttachStdin: false,
|
|
AttachStdout: true,
|
|
AttachStderr: true,
|
|
Tty: false,
|
|
Cmd: fullCmd,
|
|
Container: h.containerID,
|
|
Context: ctx,
|
|
}
|
|
exec, err := h.client.CreateExec(createExecOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
execResult := &drivers.ExecTaskResult{ExitResult: &drivers.ExitResult{}}
|
|
stdout, _ := circbuf.NewBuffer(int64(drivers.CheckBufSize))
|
|
stderr, _ := circbuf.NewBuffer(int64(drivers.CheckBufSize))
|
|
startOpts := docker.StartExecOptions{
|
|
Detach: false,
|
|
Tty: false,
|
|
OutputStream: stdout,
|
|
ErrorStream: stderr,
|
|
Context: ctx,
|
|
}
|
|
if err := client.StartExec(exec.ID, startOpts); err != nil {
|
|
return nil, err
|
|
}
|
|
execResult.Stdout = stdout.Bytes()
|
|
execResult.Stderr = stderr.Bytes()
|
|
res, err := client.InspectExec(exec.ID)
|
|
if err != nil {
|
|
return execResult, err
|
|
}
|
|
|
|
execResult.ExitResult.ExitCode = res.ExitCode
|
|
return execResult, nil
|
|
}
|
|
|
|
func (h *taskHandle) Signal(s os.Signal) error {
|
|
// Convert types
|
|
sysSig, ok := s.(syscall.Signal)
|
|
if !ok {
|
|
return fmt.Errorf("Failed to determine signal number")
|
|
}
|
|
|
|
// TODO When we expose signals we will need a mapping layer that converts
|
|
// MacOS signals to the correct signal number for docker. Or we change the
|
|
// interface to take a signal string and leave it up to driver to map?
|
|
|
|
dockerSignal := docker.Signal(sysSig)
|
|
opts := docker.KillContainerOptions{
|
|
ID: h.containerID,
|
|
Signal: dockerSignal,
|
|
}
|
|
return h.client.KillContainer(opts)
|
|
|
|
}
|
|
|
|
// Kill is used to terminate the task.
|
|
func (h *taskHandle) Kill(killTimeout time.Duration, signal os.Signal) error {
|
|
// Only send signal if killTimeout is set, otherwise stop container
|
|
if killTimeout > 0 {
|
|
if err := h.Signal(signal); err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case <-h.waitCh:
|
|
return nil
|
|
case <-time.After(killTimeout):
|
|
}
|
|
}
|
|
|
|
// Stop the container
|
|
err := h.client.StopContainer(h.containerID, 0)
|
|
if err != nil {
|
|
|
|
// Container has already been removed.
|
|
if strings.Contains(err.Error(), NoSuchContainerError) {
|
|
h.logger.Debug("attempted to stop nonexistent container")
|
|
return nil
|
|
}
|
|
// Container has already been stopped.
|
|
if strings.Contains(err.Error(), ContainerNotRunningError) {
|
|
h.logger.Debug("attempted to stop an not-running container")
|
|
return nil
|
|
}
|
|
|
|
h.logger.Error("failed to stop container", "error", err)
|
|
return fmt.Errorf("Failed to stop container %s: %s", h.containerID, err)
|
|
}
|
|
h.logger.Info("stopped container")
|
|
return nil
|
|
}
|
|
|
|
func (h *taskHandle) run() {
|
|
exitCode, werr := h.waitClient.WaitContainer(h.containerID)
|
|
if werr != nil {
|
|
h.logger.Error("failed to wait for container; already terminated")
|
|
}
|
|
|
|
if exitCode != 0 {
|
|
werr = fmt.Errorf("Docker container exited with non-zero exit code: %d", exitCode)
|
|
}
|
|
|
|
container, ierr := h.waitClient.InspectContainer(h.containerID)
|
|
oom := false
|
|
if ierr != nil {
|
|
h.logger.Error("failed to inspect container", "error", ierr)
|
|
} else if container.State.OOMKilled {
|
|
oom = true
|
|
werr = fmt.Errorf("OOM Killed")
|
|
}
|
|
|
|
// Shutdown stats collection
|
|
close(h.doneCh)
|
|
|
|
// Stop the container just incase the docker daemon's wait returned
|
|
// incorrectly
|
|
if err := h.client.StopContainer(h.containerID, 0); err != nil {
|
|
_, noSuchContainer := err.(*docker.NoSuchContainer)
|
|
_, containerNotRunning := err.(*docker.ContainerNotRunning)
|
|
if !containerNotRunning && !noSuchContainer {
|
|
h.logger.Error("error stopping container", "error", err)
|
|
}
|
|
}
|
|
|
|
// Remove the container
|
|
if h.removeContainerOnExit == true {
|
|
if err := h.client.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil {
|
|
h.logger.Error("error removing container", "error", err)
|
|
}
|
|
} else {
|
|
h.logger.Debug("not removing container due to config")
|
|
}
|
|
|
|
// Set the result
|
|
h.exitResultLock.Lock()
|
|
h.exitResult = &drivers.ExitResult{
|
|
ExitCode: exitCode,
|
|
Signal: 0,
|
|
OOMKilled: oom,
|
|
Err: werr,
|
|
}
|
|
h.exitResultLock.Unlock()
|
|
close(h.waitCh)
|
|
}
|