Alex Dadgar 693f244cce Plugin client's handle plugin dying
This PR plumbs the plugins done ctx through the base and driver plugin
clients (device already had it). Further, it adds generic handling of
gRPC stream errors.
2018-11-12 17:09:27 -08:00

1015 lines
33 KiB

// +build linux
package rkt
import (
appcschema ""
dstructs ""
cstructs ""
rktv1 ""
const (
// pluginName is the name of the plugin
pluginName = "rkt"
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
// minRktVersion is the earliest supported version of rkt. rkt added support
// for CPU and memory isolators in 0.14.0. We cannot support an earlier
// version to maintain an uniform interface across all drivers
minRktVersion = "1.27.0"
// rktCmd is the command rkt is installed as.
rktCmd = "rkt"
// networkDeadline is how long to wait for container network
// information to become available.
networkDeadline = 1 * time.Minute
var (
// PluginID is the rawexec plugin metadata registered in the plugin
// catalog.
PluginID = loader.PluginID{
Name: pluginName,
PluginType: base.PluginTypeDriver,
// PluginConfig is the rawexec factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewRktDriver(l) },
// PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options.
func PluginLoader(opts map[string]string) (map[string]interface{}, error) {
conf := map[string]interface{}{}
if v, err := strconv.ParseBool(opts["driver.rkt.volumes.enabled"]); err == nil {
conf["volumes_enabled"] = v
return conf, nil
var (
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersion: "0.0.1",
PluginVersion: "0.1.0",
Name: pluginName,
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"volumes_enabled": hclspec.NewDefault(
hclspec.NewAttr("volumes_enabled", "bool", false),
// taskConfigSpec is the hcl specification for the driver config section of
// a taskConfig within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"image": hclspec.NewAttr("image", "string", true),
"command": hclspec.NewAttr("command", "string", false),
"args": hclspec.NewAttr("args", "list(string)", false),
"trust_prefix": hclspec.NewAttr("trust_prefix", "string", false),
"dns_servers": hclspec.NewAttr("dns_servers", "list(string)", false),
"dns_search_domains": hclspec.NewAttr("dns_search_domains", "list(string)", false),
"net": hclspec.NewAttr("net", "list(string)", false),
"port_map": hclspec.NewBlockAttrs("port_map", "string", false),
"volumes": hclspec.NewAttr("volumes", "list(string)", false),
"insecure_options": hclspec.NewAttr("insecure_options", "list(string)", false),
"no_overlay": hclspec.NewAttr("no_overlay", "bool", false),
"debug": hclspec.NewAttr("debug", "bool", false),
"group": hclspec.NewAttr("group", "string", false),
// capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
capabilities = &drivers.Capabilities{
SendSignals: true,
Exec: true,
FSIsolation: cstructs.FSIsolationChroot,
reRktVersion = regexp.MustCompile(`rkt [vV]ersion[:]? (\d[.\d]+)`)
reAppcVersion = regexp.MustCompile(`appc [vV]ersion[:]? (\d[.\d]+)`)
// Config is the client configuration for the driver
type Config struct {
// VolumesEnabled allows tasks to bind host paths (volumes) inside their
// container. Binding relative paths is always allowed and will be resolved
// relative to the allocation's directory.
VolumesEnabled bool `codec:"volumes_enabled"`
// TaskConfig is the driver configuration of a taskConfig within a job
type TaskConfig struct {
ImageName string `codec:"image"`
Command string `codec:"command"`
Args []string `codec:"args"`
TrustPrefix string `codec:"trust_prefix"`
DNSServers []string `codec:"dns_servers"` // DNS Server for containers
DNSSearchDomains []string `codec:"dns_search_domains"` // DNS Search domains for containers
Net []string `codec:"net"` // Networks for the containers
PortMap map[string]string `codec:"port_map"` // A map of host port and the port name defined in the image manifest file
Volumes []string `codec:"volumes"` // Host-Volumes to mount in, syntax: /path/to/host/directory:/destination/path/in/container[:readOnly]
InsecureOptions []string `codec:"insecure_options"` // list of args for --insecure-options
NoOverlay bool `codec:"no_overlay"` // disable overlayfs for rkt run
Debug bool `codec:"debug"` // Enable debug option for rkt command
Group string `codec:"group"` // Group override for the container
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the taskConfig state and handler
// during recovery.
type TaskState struct {
ReattachConfig *shared.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
UUID string
// Driver is a driver for running images via Rkt We attempt to chose sane
// defaults for now, with more configuration available planned in the future.
type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// config is the driver configuration set by the SetConfig RPC
config *Config
// nomadConfig is the client config from nomad
nomadConfig *base.ClientDriverConfig
// tasks is the in memory datastore mapping taskIDs to rktTaskHandles
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
func NewRktDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
func (d *Driver) SetConfig(data []byte, cfg *base.ClientAgentConfig) error {
var config Config
if err := base.MsgPackDecode(data, &config); err != nil {
return err
d.config = &config
if cfg != nil {
d.nomadConfig = cfg.Driver
return nil
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
defer close(ch)
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
case <-d.ctx.Done():
case <-ticker.C:
ch <- d.buildFingerprint()
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
fingerprint := &drivers.Fingerprint{
Attributes: map[string]string{},
Health: drivers.HealthStateHealthy,
HealthDescription: "ready",
// Only enable if we are root
if syscall.Geteuid() != 0 {
d.logger.Debug("must run as root user, disabling")
fingerprint.Health = drivers.HealthStateUndetected
fingerprint.HealthDescription = "driver must run as root user"
return fingerprint
outBytes, err := exec.Command(rktCmd, "version").Output()
if err != nil {
fingerprint.Health = drivers.HealthStateUndetected
fingerprint.HealthDescription = fmt.Sprintf("failed to executor %s version: %v", rktCmd, err)
return fingerprint
out := strings.TrimSpace(string(outBytes))
rktMatches := reRktVersion.FindStringSubmatch(out)
appcMatches := reAppcVersion.FindStringSubmatch(out)
if len(rktMatches) != 2 || len(appcMatches) != 2 {
fingerprint.Health = drivers.HealthStateUndetected
fingerprint.HealthDescription = "unable to parse rkt version string"
return fingerprint
minVersion, _ := version.NewVersion(minRktVersion)
currentVersion, _ := version.NewVersion(rktMatches[1])
if currentVersion.LessThan(minVersion) {
// Do not allow ancient rkt versions
fingerprint.Health = drivers.HealthStateUndetected
fingerprint.HealthDescription = fmt.Sprintf("unsuported rkt version %s", currentVersion)
d.logger.Warn("unsupported rkt version please upgrade to >= "+minVersion.String(),
"rkt_version", currentVersion)
return fingerprint
fingerprint.Attributes["driver.rkt"] = "1"
fingerprint.Attributes["driver.rkt.version"] = rktMatches[1]
fingerprint.Attributes["driver.rkt.appc.version"] = appcMatches[1]
if d.config.VolumesEnabled {
fingerprint.Attributes["driver.rkt.volumes.enabled"] = "1"
return fingerprint
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("error: handle cannot be nil")
// If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Trace("nothing to recover; task already exists",
"task_id", handle.Config.ID,
"task_name", handle.Config.Name,
return nil
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to decode taskConfig state from handle: %v", err)
plugRC, err := shared.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
d.logger.Error("failed to build ReattachConfig from taskConfig state", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
pluginConfig := &plugin.ClientConfig{
Reattach: plugRC,
execImpl, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
// The taskConfig's environment is set via --set-env flags in Start, but the rkt
// command itself needs an environment with PATH set to find iptables.
// TODO (preetha) need to figure out how to read env.blacklist
eb := env.NewEmptyBuilder()
filter := strings.Split(config.DefaultEnvBlacklist, ",")
rktEnv := eb.SetHostEnvvars(filter).Build()
h := &taskHandle{
exec: execImpl,
env: rktEnv,
pid: taskState.Pid,
uuid: taskState.UUID,
pluginClient: pluginClient,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
d.tasks.Set(taskState.TaskConfig.ID, h)
return nil
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("taskConfig with ID '%s' already started", cfg.ID)
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
handle := drivers.NewTaskHandle(pluginName)
handle.Config = cfg
// todo(preetha) - port map in client v1 is a slice of maps that get merged. figure out if the caller will do this
// ACI image
img := driverConfig.ImageName
// Global arguments given to both prepare and run-prepared
globalArgs := make([]string, 0, 50)
// Add debug option to rkt command.
debug := driverConfig.Debug
// Add the given trust prefix
trustPrefix := driverConfig.TrustPrefix
insecure := false
if trustPrefix != "" {
var outBuf, errBuf bytes.Buffer
cmd := exec.Command(rktCmd, "trust", "--skip-fingerprint-review=true", fmt.Sprintf("--prefix=%s", trustPrefix), fmt.Sprintf("--debug=%t", debug))
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
if err := cmd.Run(); err != nil {
return nil, nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String())
d.logger.Debug("added trust prefix", "trust_prefix", trustPrefix, "task_name", cfg.Name)
} else {
// Disable signature verification if the trust command was not run.
insecure = true
// if we have a selective insecure_options, prefer them
// insecure options are rkt's global argument, so we do this before the actual "run"
if len(driverConfig.InsecureOptions) > 0 {
globalArgs = append(globalArgs, fmt.Sprintf("--insecure-options=%s", strings.Join(driverConfig.InsecureOptions, ",")))
} else if insecure {
globalArgs = append(globalArgs, "--insecure-options=all")
// debug is rkt's global argument, so add it before the actual "run"
globalArgs = append(globalArgs, fmt.Sprintf("--debug=%t", debug))
prepareArgs := make([]string, 0, 50)
runArgs := make([]string, 0, 50)
prepareArgs = append(prepareArgs, globalArgs...)
prepareArgs = append(prepareArgs, "prepare")
runArgs = append(runArgs, globalArgs...)
runArgs = append(runArgs, "run-prepared")
// disable overlayfs
if driverConfig.NoOverlay {
prepareArgs = append(prepareArgs, "--no-overlay=true")
// Convert underscores to dashes in taskConfig names for use in volume names #2358
sanitizedName := strings.Replace(cfg.Name, "_", "-", -1)
// Mount /alloc
allocVolName := fmt.Sprintf("%s-%s-alloc", cfg.ID, sanitizedName)
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, cfg.TaskDir().SharedAllocDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, "/alloc"))
// Mount /local
localVolName := fmt.Sprintf("%s-%s-local", cfg.ID, sanitizedName)
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, cfg.TaskDir().LocalDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, "/local"))
// Mount /secrets
secretsVolName := fmt.Sprintf("%s-%s-secrets", cfg.ID, sanitizedName)
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, cfg.TaskDir().SecretsDir))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, "/secrets"))
// Mount arbitrary volumes if enabled
if len(driverConfig.Volumes) > 0 {
if !d.config.VolumesEnabled {
return nil, nil, fmt.Errorf("volumes_enabled is false; cannot use rkt volumes: %+q", driverConfig.Volumes)
for i, rawvol := range driverConfig.Volumes {
parts := strings.Split(rawvol, ":")
readOnly := "false"
// job spec:
// volumes = ["/host/path:/container/path[:readOnly]"]
// the third parameter is optional, mount is read-write by default
if len(parts) == 3 {
if parts[2] == "readOnly" {
d.logger.Debug("mounting volume as readOnly", "volume", strings.Join(parts[:2], parts[1]))
readOnly = "true"
} else {
d.logger.Warn("unknown volume parameter ignored for mount", "parameter", parts[2], "mount", parts[0])
} else if len(parts) != 2 {
return nil, nil, fmt.Errorf("invalid rkt volume: %q", rawvol)
volName := fmt.Sprintf("%s-%s-%d", cfg.ID, sanitizedName, i)
prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%s", volName, parts[0], readOnly))
prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1]))
// Inject environment variables
for k, v := range cfg.Env {
prepareArgs = append(prepareArgs, fmt.Sprintf("--set-env=%s=%s", k, v))
// Image is set here, because the commands that follow apply to it
prepareArgs = append(prepareArgs, img)
// Check if the user has overridden the exec command.
if driverConfig.Command != "" {
prepareArgs = append(prepareArgs, fmt.Sprintf("--exec=%v", driverConfig.Command))
// Add memory isolator
prepareArgs = append(prepareArgs, fmt.Sprintf("--memory=%v", cfg.Resources.LinuxResources.MemoryLimitBytes))
// Add CPU isolator
prepareArgs = append(prepareArgs, fmt.Sprintf("--cpu-shares=%v", cfg.Resources.LinuxResources.CPUShares))
// Add DNS servers
if len(driverConfig.DNSServers) == 1 && (driverConfig.DNSServers[0] == "host" || driverConfig.DNSServers[0] == "none") {
// Special case single item lists with the special values "host" or "none"
runArgs = append(runArgs, fmt.Sprintf("--dns=%s", driverConfig.DNSServers[0]))
} else {
for _, ip := range driverConfig.DNSServers {
if err := net.ParseIP(ip); err == nil {
wrappedErr := fmt.Errorf("invalid ip address for container dns server %q", ip)
d.logger.Debug("error parsing DNS server", "error", wrappedErr)
return nil, nil, wrappedErr
runArgs = append(runArgs, fmt.Sprintf("--dns=%s", ip))
// set DNS search domains
for _, domain := range driverConfig.DNSSearchDomains {
runArgs = append(runArgs, fmt.Sprintf("--dns-search=%s", domain))
// set network
network := strings.Join(driverConfig.Net, ",")
if network != "" {
runArgs = append(runArgs, fmt.Sprintf("--net=%s", network))
// Setup port mapping and exposed ports
if len(cfg.Resources.NomadResources.Networks) == 0 {
d.logger.Debug("no network interfaces are available")
if len(driverConfig.PortMap) > 0 {
return nil, nil, fmt.Errorf("Trying to map ports but no network interface is available")
} else if network == "host" {
// Port mapping is skipped when host networking is used.
d.logger.Debug("Ignoring port_map when using --net=host", "task_name", cfg.Name)
} else {
network := cfg.Resources.NomadResources.Networks[0]
for _, port := range network.ReservedPorts {
var containerPort string
mapped, ok := driverConfig.PortMap[port.Label]
if !ok {
// If the user doesn't have a mapped port using port_map, driver stops running container.
return nil, nil, fmt.Errorf("port_map is not set. When you defined port in the resources, you need to configure port_map.")
containerPort = mapped
hostPortStr := strconv.Itoa(port.Value)
d.logger.Debug("driver.rkt: exposed port", "containerPort", containerPort)
// Add port option to rkt run arguments. rkt allows multiple port args
prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
for _, port := range network.DynamicPorts {
// By default we will map the allocated port 1:1 to the container
var containerPort string
if mapped, ok := driverConfig.PortMap[port.Label]; ok {
containerPort = mapped
} else {
// If the user doesn't have mapped a port using port_map, driver stops running container.
return nil, nil, fmt.Errorf("port_map is not set. When you defined port in the resources, you need to configure port_map.")
hostPortStr := strconv.Itoa(port.Value)
d.logger.Debug("exposed port", "containerPort", containerPort, "task_name", cfg.Name)
// Add port option to rkt run arguments. rkt allows multiple port args
prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
// If a user has been specified for the taskConfig, pass it through to the user
if cfg.User != "" {
prepareArgs = append(prepareArgs, fmt.Sprintf("--user=%s", cfg.User))
// There's no taskConfig-level parameter for groups so check the driver
// config for a custom group
if driverConfig.Group != "" {
prepareArgs = append(prepareArgs, fmt.Sprintf("--group=%s", driverConfig.Group))
// Add user passed arguments.
if len(driverConfig.Args) != 0 {
// Need to start arguments with "--"
prepareArgs = append(prepareArgs, "--")
for _, arg := range driverConfig.Args {
prepareArgs = append(prepareArgs, fmt.Sprintf("%v", arg))
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, fmt.Sprintf("%s-executor.out", cfg.Name))
executorConfig := &dstructs.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
execImpl, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, err
absPath, err := GetAbsolutePath(rktCmd)
if err != nil {
return nil, nil, err
var outBuf, errBuf bytes.Buffer
cmd := exec.Command(rktCmd, prepareArgs...)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
d.logger.Debug("preparing taskConfig", "pod", img, "task_name", cfg.Name, "args", prepareArgs)
if err := cmd.Run(); err != nil {
return nil, nil, fmt.Errorf("Error preparing rkt pod: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String())
uuid := strings.TrimSpace(outBuf.String())
d.logger.Debug("taskConfig prepared", "pod", img, "task_name", cfg.Name, "uuid", uuid)
runArgs = append(runArgs, uuid)
// The taskConfig's environment is set via --set-env flags above, but the rkt
// command itself needs an environment with PATH set to find iptables.
// TODO (preetha) need to figure out how to pass env.blacklist from client config
eb := env.NewEmptyBuilder()
filter := strings.Split(config.DefaultEnvBlacklist, ",")
rktEnv := eb.SetHostEnvvars(filter).Build()
// Enable ResourceLimits to place the executor in a parent cgroup of
// the rkt container. This allows stats collection via the executor to
// work just like it does for exec.
execCmd := &executor.ExecCommand{
Cmd: absPath,
Args: runArgs,
ResourceLimits: true,
Resources: &executor.Resources{
CPU: int(cfg.Resources.LinuxResources.CPUShares),
MemoryMB: int(drivers.BytesToMB(cfg.Resources.LinuxResources.MemoryLimitBytes)),
DiskMB: cfg.Resources.NomadResources.DiskMB,
Env: cfg.EnvList(),
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
ps, err := execImpl.Launch(execCmd)
if err != nil {
return nil, nil, err
d.logger.Debug("started taskConfig", "aci", img, "uuid", uuid, "task_name", cfg.Name, "args", runArgs)
h := &taskHandle{
exec: execImpl,
env: rktEnv,
pid: ps.Pid,
uuid: uuid,
pluginClient: pluginClient,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
rktDriverState := TaskState{
ReattachConfig: shared.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
UUID: uuid,
if err := handle.SetDriverState(&rktDriverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err, "task_name", cfg.Name)
execImpl.Shutdown("", 0)
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
d.tasks.Set(cfg.ID, h)
// Do not attempt to retrieve driver network if one won't exist:
// - "host" means the container itself has no networking metadata
// - "none" means no network is configured
var driverNetwork *cstructs.DriverNetwork
if network != "host" && network != "none" {
d.logger.Debug("retrieving network information for pod", "pod", img, "UUID", uuid, "task_name", cfg.Name)
driverNetwork, err = rktGetDriverNetwork(uuid, driverConfig.PortMap, d.logger)
if err != nil && !pluginClient.Exited() {
d.logger.Warn("network status retrieval for pod failed", "pod", img, "UUID", uuid, "task_name", cfg.Name, "error", err)
// If a portmap was given, this turns into a fatal error
if len(driverConfig.PortMap) != 0 {
return nil, nil, fmt.Errorf("Trying to map ports but driver could not determine network information")
return handle, driverNetwork, nil
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
ch := make(chan *drivers.ExitResult)
go d.handleWait(ctx, handle, ch)
return ch, nil
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
return fmt.Errorf("executor Shutdown failed: %v", err)
return nil
func (d *Driver) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
if handle.IsRunning() && !force {
return fmt.Errorf("cannot destroy running task")
if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
return nil
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
return handle.TaskStatus(), nil
func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
return handle.exec.Stats()
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
func (d *Driver) SignalTask(taskID string, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
d.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal, "task_id", handle.taskConfig.ID, "task_name", handle.taskConfig.Name)
sig = s
return handle.exec.Signal(sig)
func (d *Driver) ExecTask(taskID string, cmdArgs []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
if len(cmdArgs) == 0 {
return nil, fmt.Errorf("error cmd must have atleast one value")
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
// enter + UUID + cmd + args...
cmd := cmdArgs[0]
args := cmdArgs[1:]
enterArgs := make([]string, 3+len(args))
enterArgs[0] = "enter"
enterArgs[1] = handle.uuid
enterArgs[2] = handle.env.ReplaceEnv(cmd)
copy(enterArgs[3:], handle.env.ParseAndReplace(args))
out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), rktCmd, enterArgs)
if err != nil {
return nil, err
return &drivers.ExecTaskResult{
Stdout: out,
ExitResult: &drivers.ExitResult{
ExitCode: exitCode,
}, nil
// GetAbsolutePath returns the absolute path of the passed binary by resolving
// it in the path and following symlinks.
func GetAbsolutePath(bin string) (string, error) {
lp, err := exec.LookPath(bin)
if err != nil {
return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err)
return filepath.EvalSymlinks(lp)
func rktGetDriverNetwork(uuid string, driverConfigPortMap map[string]string, logger hclog.Logger) (*cstructs.DriverNetwork, error) {
deadline := time.Now().Add(networkDeadline)
var lastErr error
try := 0
for time.Now().Before(deadline) {
if status, err := rktGetStatus(uuid, logger); err == nil {
for _, net := range status.Networks {
if !net.IP.IsGlobalUnicast() {
// Get the pod manifest so we can figure out which ports are exposed
var portmap map[string]int
manifest, err := rktGetManifest(uuid)
if err == nil {
portmap, err = rktManifestMakePortMap(manifest, driverConfigPortMap)
if err != nil {
lastErr = fmt.Errorf("could not create manifest-based portmap: %v", err)
return nil, lastErr
} else {
lastErr = fmt.Errorf("could not get pod manifest: %v", err)
return nil, lastErr
// This is a successful landing; log if its not the first attempt.
if try > 1 {
logger.Debug("retrieved network info for pod", "uuid", uuid, "attempt", try)
return &cstructs.DriverNetwork{
PortMap: portmap,
IP: status.Networks[0].IP.String(),
}, nil
if len(status.Networks) == 0 {
lastErr = fmt.Errorf("no networks found")
} else {
lastErr = fmt.Errorf("no good driver networks out of %d returned", len(status.Networks))
} else {
lastErr = fmt.Errorf("getting status failed: %v", err)
waitTime := getJitteredNetworkRetryTime()
logger.Debug("failed getting network info for pod, sleeping", "uuid", uuid, "attempt", try, "err", lastErr, "wait", waitTime)
return nil, fmt.Errorf("timed out, last error: %v", lastErr)
// Given a rkt/appc pod manifest and driver portmap configuration, create
// a driver portmap.
func rktManifestMakePortMap(manifest *appcschema.PodManifest, configPortMap map[string]string) (map[string]int, error) {
if len(manifest.Apps) == 0 {
return nil, fmt.Errorf("manifest has no apps")
if len(manifest.Apps) != 1 {
return nil, fmt.Errorf("manifest has multiple apps!")
app := manifest.Apps[0]
if app.App == nil {
return nil, fmt.Errorf("specified app has no App object")
portMap := make(map[string]int)
for svc, name := range configPortMap {
for _, port := range app.App.Ports {
if port.Name.String() == name {
portMap[svc] = int(port.Port)
return portMap, nil
// Retrieve pod status for the pod with the given UUID.
func rktGetStatus(uuid string, logger hclog.Logger) (*rktv1.Pod, error) {
statusArgs := []string{
var outBuf, errBuf bytes.Buffer
cmd := exec.Command(rktCmd, statusArgs...)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
if err := cmd.Run(); err != nil {
if outBuf.Len() > 0 {
logger.Debug("status output for UUID", "uuid", uuid, elide(outBuf))
if errBuf.Len() == 0 {
return nil, err
logger.Debug("status error output", "uuid", uuid, "error", elide(errBuf))
return nil, fmt.Errorf("%s. stderr: %q", err, elide(errBuf))
var status rktv1.Pod
if err := json.Unmarshal(outBuf.Bytes(), &status); err != nil {
return nil, err
return &status, nil
// Retrieves a pod manifest
func rktGetManifest(uuid string) (*appcschema.PodManifest, error) {
statusArgs := []string{
var outBuf bytes.Buffer
cmd := exec.Command(rktCmd, statusArgs...)
cmd.Stdout = &outBuf
cmd.Stderr = ioutil.Discard
if err := cmd.Run(); err != nil {
return nil, err
var manifest appcschema.PodManifest
if err := json.Unmarshal(outBuf.Bytes(), &manifest); err != nil {
return nil, err
return &manifest, nil
// Create a time with a 0 to 100ms jitter for rktGetDriverNetwork retries
func getJitteredNetworkRetryTime() time.Duration {
return time.Duration(900+rand.Intn(100)) * time.Millisecond
// Conditionally elide a buffer to an arbitrary length
func elideToLen(inBuf bytes.Buffer, length int) bytes.Buffer {
if inBuf.Len() > length {
return inBuf
// Conditionally elide a buffer to an 80 character string
func elide(inBuf bytes.Buffer) string {
tempBuf := elideToLen(inBuf, 80)
return tempBuf.String()
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
} else {
result = &drivers.ExitResult{
ExitCode: ps.ExitCode,
Signal: ps.Signal,
select {
case <-ctx.Done():
case <-d.ctx.Done():
case ch <- result: