open-nomad/client/driver/rkt.go

285 lines
7.5 KiB
Go
Raw Normal View History

package driver
import (
2015-10-07 22:24:16 +00:00
"bytes"
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
2015-10-09 19:14:56 +00:00
"path/filepath"
2015-10-07 22:24:16 +00:00
"regexp"
"runtime"
"strings"
"syscall"
"time"
2015-10-09 19:14:56 +00:00
"github.com/hashicorp/nomad/client/allocdir"
2015-10-07 22:24:16 +00:00
"github.com/hashicorp/nomad/client/config"
2015-11-17 00:23:03 +00:00
cstructs "github.com/hashicorp/nomad/client/driver/structs"
2015-11-05 21:46:02 +00:00
"github.com/hashicorp/nomad/client/fingerprint"
2015-11-26 22:13:19 +00:00
"github.com/hashicorp/nomad/helper/args"
2015-10-07 22:24:16 +00:00
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
var (
2015-10-21 01:08:46 +00:00
reRktVersion = regexp.MustCompile(`rkt version (\d[.\d]+)`)
reAppcVersion = regexp.MustCompile(`appc version (\d[.\d]+)`)
)
// RktDriver is a driver for running images via Rkt
// We attempt to chose sane defaults for now, with more configuration available
// planned in the future
type RktDriver struct {
2015-10-07 22:24:16 +00:00
DriverContext
2015-11-05 21:46:02 +00:00
fingerprint.StaticFingerprinter
}
2015-11-14 04:22:49 +00:00
type RktDriverConfig struct {
ImageName string `mapstructure:"image"`
Args []string `mapstructure:"args"`
}
// rktHandle is returned from Start/Open as a handle to the PID
type rktHandle struct {
2015-10-07 22:24:16 +00:00
proc *os.Process
image string
2015-10-07 22:24:16 +00:00
logger *log.Logger
waitCh chan *cstructs.WaitResult
2015-10-07 22:24:16 +00:00
doneCh chan struct{}
}
// rktPID is a struct to map the pid running the process to the vm image on
// disk
type rktPID struct {
Pid int
Image string
}
// NewRktDriver is used to create a new exec driver
func NewRktDriver(ctx *DriverContext) Driver {
2015-11-05 21:46:02 +00:00
return &RktDriver{DriverContext: *ctx}
}
func (d *RktDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) {
2015-10-07 22:24:16 +00:00
// Only enable if we are root when running on non-windows systems.
if runtime.GOOS != "windows" && syscall.Geteuid() != 0 {
d.logger.Printf("[DEBUG] driver.rkt: must run as root user, disabling")
return false, nil
}
outBytes, err := exec.Command("rkt", "version").Output()
if err != nil {
return false, nil
}
out := strings.TrimSpace(string(outBytes))
rktMatches := reRktVersion.FindStringSubmatch(out)
2015-10-21 01:08:46 +00:00
appcMatches := reAppcVersion.FindStringSubmatch(out)
2015-10-07 22:24:16 +00:00
if len(rktMatches) != 2 || len(appcMatches) != 2 {
return false, fmt.Errorf("Unable to parse Rkt version string: %#v", rktMatches)
}
node.Attributes["driver.rkt"] = "1"
2015-10-21 01:08:46 +00:00
node.Attributes["driver.rkt.version"] = rktMatches[1]
2015-10-07 22:24:16 +00:00
node.Attributes["driver.rkt.appc.version"] = appcMatches[1]
return true, nil
}
// Run an existing Rkt image.
func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
2015-11-14 04:22:49 +00:00
var driverConfig RktDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
}
// Validate that the config is valid.
img := driverConfig.ImageName
if img == "" {
return nil, fmt.Errorf("Missing ACI image for rkt")
}
2015-10-09 19:14:56 +00:00
// Get the tasks local directory.
taskName := d.DriverContext.taskName
taskDir, ok := ctx.AllocDir.TaskDirs[taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)
2015-10-07 22:24:16 +00:00
// Add the given trust prefix
2015-12-21 10:10:37 +00:00
trustPrefix, trustCmd := task.Config["trust_prefix"]
if trustCmd {
var outBuf, errBuf bytes.Buffer
2015-12-21 10:10:37 +00:00
cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trustPrefix))
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String())
}
2015-12-21 10:10:37 +00:00
d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trustPrefix)
2015-10-07 22:24:16 +00:00
}
// Build the command.
2015-12-21 10:10:37 +00:00
var cmdArgs []string
// Inject the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)
// Clear the task directories as they are not currently supported.
envVars.ClearTaskLocalDir()
envVars.ClearAllocDir()
for k, v := range envVars.Map() {
2015-12-21 10:10:37 +00:00
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v))
2015-10-07 22:24:16 +00:00
}
// Disble signature verification if the trust command was not run.
2015-12-21 10:10:37 +00:00
if !trustCmd {
cmdArgs = append(cmdArgs, "--insecure-skip-verify")
}
// Append the run command.
2015-12-21 10:10:37 +00:00
cmdArgs = append(cmdArgs, "run", "--mds-register=false", img)
// Check if the user has overriden the exec command.
2015-12-21 10:10:37 +00:00
if execCmd, ok := task.Config["command"]; ok {
cmdArgs = append(cmdArgs, fmt.Sprintf("--exec=%v", execCmd))
2015-10-07 22:24:16 +00:00
}
if task.Resources.MemoryMB == 0 {
2015-12-21 06:09:11 +00:00
return nil, fmt.Errorf("Memory limit cannot be zero")
}
if task.Resources.CPU == 0 {
return nil, fmt.Errorf("CPU limit cannot be zero")
}
// Add memory isolator
2015-12-21 10:10:37 +00:00
cmdArgs = append(cmdArgs, fmt.Sprintf("--memory=%vM", int64(task.Resources.MemoryMB)*1024*1024))
// Add CPU isolator
2015-12-21 10:10:37 +00:00
cmdArgs = append(cmdArgs, fmt.Sprintf("--cpu=%vm", int64(task.Resources.CPU)))
// Add user passed arguments.
if len(driverConfig.Args) != 0 {
parsed := args.ParseAndReplace(driverConfig.Args, envVars.Map())
// Need to start arguments with "--"
if len(parsed) > 0 {
2015-12-21 10:10:37 +00:00
cmdArgs = append(cmdArgs, "--")
}
for _, arg := range parsed {
2015-12-21 10:10:37 +00:00
cmdArgs = append(cmdArgs, fmt.Sprintf("%v", arg))
}
2015-10-07 22:24:16 +00:00
}
2015-10-09 19:14:56 +00:00
// Create files to capture stdin and out.
stdoutFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stdout", taskName))
stderrFilename := filepath.Join(taskLocal, fmt.Sprintf("%s.stderr", taskName))
stdo, err := os.OpenFile(stdoutFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to redirect stdout: %v", err)
}
stde, err := os.OpenFile(stderrFilename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to redirect stderr: %v", err)
}
2015-12-21 10:10:37 +00:00
cmd := exec.Command("rkt", cmdArgs...)
2015-10-09 19:14:56 +00:00
cmd.Stdout = stdo
cmd.Stderr = stde
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("Error running rkt: %v", err)
2015-10-07 22:24:16 +00:00
}
d.logger.Printf("[DEBUG] driver.rkt: started ACI %q with: %v", img, cmd.Args)
2015-10-07 22:24:16 +00:00
h := &rktHandle{
proc: cmd.Process,
image: img,
2015-10-07 22:24:16 +00:00
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
2015-10-07 22:24:16 +00:00
}
go h.run()
return h, nil
}
func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
2015-10-07 22:24:16 +00:00
// Parse the handle
pidBytes := []byte(strings.TrimPrefix(handleID, "Rkt:"))
qpid := &rktPID{}
if err := json.Unmarshal(pidBytes, qpid); err != nil {
return nil, fmt.Errorf("failed to parse Rkt handle '%s': %v", handleID, err)
}
// Find the process
proc, err := os.FindProcess(qpid.Pid)
if proc == nil || err != nil {
return nil, fmt.Errorf("failed to find Rkt PID %d: %v", qpid.Pid, err)
}
// Return a driver handle
h := &rktHandle{
proc: proc,
image: qpid.Image,
2015-10-07 22:24:16 +00:00
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *cstructs.WaitResult, 1),
2015-10-07 22:24:16 +00:00
}
go h.run()
return h, nil
}
func (h *rktHandle) ID() string {
2015-10-07 22:24:16 +00:00
// Return a handle to the PID
pid := &rktPID{
Pid: h.proc.Pid,
Image: h.image,
2015-10-07 22:24:16 +00:00
}
data, err := json.Marshal(pid)
if err != nil {
h.logger.Printf("[ERR] driver.rkt: failed to marshal rkt PID to JSON: %s", err)
}
return fmt.Sprintf("Rkt:%s", string(data))
}
func (h *rktHandle) WaitCh() chan *cstructs.WaitResult {
2015-10-07 22:24:16 +00:00
return h.waitCh
}
func (h *rktHandle) Update(task *structs.Task) error {
2015-10-07 22:24:16 +00:00
// Update is not possible
return nil
}
// Kill is used to terminate the task. We send an Interrupt
// and then provide a 5 second grace period before doing a Kill.
func (h *rktHandle) Kill() error {
2015-10-07 22:24:16 +00:00
h.proc.Signal(os.Interrupt)
select {
case <-h.doneCh:
return nil
case <-time.After(5 * time.Second):
return h.proc.Kill()
}
}
func (h *rktHandle) run() {
2015-10-07 22:24:16 +00:00
ps, err := h.proc.Wait()
close(h.doneCh)
code := 0
if !ps.Success() {
// TODO: Better exit code parsing.
code = 1
2015-10-07 22:24:16 +00:00
}
h.waitCh <- cstructs.NewWaitResult(code, 0, err)
2015-10-07 22:24:16 +00:00
close(h.waitCh)
}