drivers: Add/Use go-getter to fetch remote binaries

Updates Qemu, Java drivers to use go-getter to fetch binaries
Adds remote artifact support for Exec, Raw Exec drivers
This commit is contained in:
Clint Shryock 2015-10-15 16:40:08 -05:00
parent 6f104418f3
commit 343daeb1ea
13 changed files with 407 additions and 74 deletions

View File

@ -3,6 +3,7 @@ package driver
import (
"fmt"
"log"
"path/filepath"
"sync"
"github.com/hashicorp/nomad/client/allocdir"
@ -114,6 +115,12 @@ func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.
if ctx.AllocDir != nil {
env.SetAllocDir(ctx.AllocDir.SharedDir)
taskdir, ok := ctx.AllocDir.TaskDirs[task.Name]
if !ok {
// TODO: Update this to return an error
}
env.SetTaskLocalDir(filepath.Join(taskdir, allocdir.TaskLocal))
}
if task.Resources != nil {

View File

@ -2,10 +2,15 @@ package driver
import (
"fmt"
"log"
"path"
"path/filepath"
"runtime"
"syscall"
"time"
"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
"github.com/hashicorp/nomad/nomad/structs"
@ -41,12 +46,40 @@ func (d *ExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
}
func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the command
// Get the command to be ran
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for exec driver")
}
// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
// We use go-getter to support a variety of protocols, but need to change
// file permissions of the resulted download to be executable
// Create a location to download the artifact.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
destDir := filepath.Join(taskDir, allocdir.TaskLocal)
artifactName := path.Base(source)
artifactFile := filepath.Join(destDir, artifactName)
if err := getter.GetFile(artifactFile, source); err != nil {
return nil, fmt.Errorf("Error downloading artifact for Exec driver: %s", err)
}
// Add execution permissions to the newly downloaded artifact
if runtime.GOOS != "windows" {
if err := syscall.Chmod(artifactFile, 0755); err != nil {
log.Printf("[ERR] driver.Exec: Error making artifact executable: %s", err)
}
}
}
// Get the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)

View File

@ -5,6 +5,7 @@ import (
"io/ioutil"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"
@ -120,6 +121,104 @@ func TestExecDriver_Start_Wait(t *testing.T) {
}
}
func TestExecDriver_Start_Artifact_basic(t *testing.T) {
ctestutils.ExecCompatible(t)
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}
task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": filepath.Join("$NOMAD_TASK_DIR", file),
},
Resources: basicResources,
}
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}
// Task should terminate quickly
select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}
func TestExecDriver_Start_Artifact_expanded(t *testing.T) {
ctestutils.ExecCompatible(t)
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}
task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": "/bin/bash",
"args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)),
},
Resources: basicResources,
}
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}
// Task should terminate quickly
select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}
func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
ctestutils.ExecCompatible(t)

View File

@ -3,9 +3,6 @@ package driver
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
@ -14,6 +11,7 @@ import (
"syscall"
"time"
"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
@ -97,37 +95,18 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("missing jar source for Java Jar driver")
}
// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}
// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)
destDir := filepath.Join(taskDir, allocdir.TaskLocal)
// Create a location to download the binary.
fName := path.Base(source)
fPath := filepath.Join(taskLocal, fName)
f, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}
defer f.Close()
defer resp.Body.Close()
// Copy remote file to local directory for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(f, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying jar from source: %s", ioErr)
jarName := path.Base(source)
jarPath := filepath.Join(destDir, jarName)
if err := getter.GetFile(jarPath, source); err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}
// Get the environment variables.
@ -141,10 +120,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
args = append(args, jvm_options)
}
// Build the argument list
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, fName))
// Build the argument list.
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName))
if argRaw, ok := task.Config["args"]; ok {
args = append(args, argRaw)
}

View File

@ -8,7 +8,6 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
@ -19,6 +18,7 @@ import (
"syscall"
"time"
"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
@ -94,45 +94,25 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Missing required Task Resource: Memory")
}
// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Qemu driver: %s", err)
}
// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)
// Create a location in the local directory to download and store the image.
// TODO: Caching
// Create a location to download the binary.
destDir := filepath.Join(taskDir, allocdir.TaskLocal)
vmID := fmt.Sprintf("qemu-vm-%s-%s", structs.GenerateUUID(), filepath.Base(source))
fPath := filepath.Join(taskLocal, vmID)
vmPath, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}
defer vmPath.Close()
defer resp.Body.Close()
// Copy remote file to local AllocDir for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(vmPath, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying Qemu image from source: %s", ioErr)
vmPath := filepath.Join(destDir, vmID)
if err := getter.GetFile(vmPath, source); err != nil {
return nil, fmt.Errorf("Error downloading artifact for Qemu driver: %s", err)
}
// compute and check checksum
if check, ok := task.Config["checksum"]; ok {
d.logger.Printf("[DEBUG] Running checksum on (%s)", vmID)
hasher := sha256.New()
file, err := os.Open(vmPath.Name())
file, err := os.Open(vmPath)
if err != nil {
return nil, fmt.Errorf("Failed to open file for checksum")
}
@ -163,7 +143,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
"-machine", "type=pc,accel=" + accelerator,
"-name", vmID,
"-m", mem,
"-drive", "file=" + vmPath.Name(),
"-drive", "file=" + vmPath,
"-nodefconfig",
"-nodefaults",
"-nographic",
@ -240,7 +220,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Create and Return Handle
h := &qemuHandle{
proc: cmd.Process,
vmID: vmPath.Name(),
vmID: vmPath,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
}

View File

@ -2,14 +2,18 @@ package driver
import (
"fmt"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/args"
@ -61,12 +65,6 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
}
func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the command
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for raw_exec driver")
}
// Get the tasks local directory.
taskName := d.DriverContext.taskName
taskDir, ok := ctx.AllocDir.TaskDirs[taskName]
@ -75,9 +73,49 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)
// Get the command to be ran
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for Raw Exec driver")
}
// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
// We use go-getter to support a variety of protocols, but need to change
// file permissions of the resulted download to be executable
// Create a location to download the artifact.
destDir := filepath.Join(taskDir, allocdir.TaskLocal)
artifactName := path.Base(source)
artifactFile := filepath.Join(destDir, artifactName)
if err := getter.GetFile(artifactFile, source); err != nil {
return nil, fmt.Errorf("Error downloading artifact for Raw Exec driver: %s", err)
}
// Add execution permissions to the newly downloaded artifact
if runtime.GOOS != "windows" {
if err := syscall.Chmod(artifactFile, 0755); err != nil {
log.Printf("[ERR] driver.raw_exec: Error making artifact executable: %s", err)
}
}
}
// Get the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)
// expand NOMAD_TASK_DIR
parsedPath, err := args.ParseAndReplace(command, envVars.Map())
if err != nil {
return nil, fmt.Errorf("failure to parse arguments in command path: %v", command)
} else if len(parsedPath) != 1 {
return nil, fmt.Errorf("couldn't properly parse command path: %v", command)
}
cm := parsedPath[0]
// Look for arguments
var cmdArgs []string
if argRaw, ok := task.Config["args"]; ok {
@ -89,7 +127,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
}
// Setup the command
cmd := exec.Command(command, cmdArgs...)
cmd := exec.Command(cm, cmdArgs...)
cmd.Dir = taskDir
cmd.Env = envVars.List()

View File

@ -5,6 +5,7 @@ import (
"io/ioutil"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"
@ -92,6 +93,113 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
}
}
func TestRawExecDriver_Start_Artifact_basic(t *testing.T) {
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}
task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": filepath.Join("$NOMAD_TASK_DIR", file),
},
}
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Attempt to open
handle2, err := d.Open(ctx, handle.ID())
if err != nil {
t.Fatalf("err: %v", err)
}
if handle2 == nil {
t.Fatalf("missing handle")
}
// Task should terminate quickly
select {
case <-handle2.WaitCh():
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
// Check they are both tracking the same PID.
pid1 := handle.(*rawExecHandle).proc.Pid
pid2 := handle2.(*rawExecHandle).proc.Pid
if pid1 != pid2 {
t.Fatalf("tracking incorrect Pid; %v != %v", pid1, pid2)
}
}
func TestRawExecDriver_Start_Artifact_expanded(t *testing.T) {
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}
task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": "/bin/bash",
"args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)),
},
}
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Attempt to open
handle2, err := d.Open(ctx, handle.ID())
if err != nil {
t.Fatalf("err: %v", err)
}
if handle2 == nil {
t.Fatalf("missing handle")
}
// Task should terminate quickly
select {
case <-handle2.WaitCh():
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
// Check they are both tracking the same PID.
pid1 := handle.(*rawExecHandle).proc.Pid
pid2 := handle2.(*rawExecHandle).proc.Pid
if pid1 != pid2 {
t.Fatalf("tracking incorrect Pid; %v != %v", pid1, pid2)
}
}
func TestRawExecDriver_Start_Wait(t *testing.T) {
task := &structs.Task{
Name: "sleep",

View File

@ -135,6 +135,7 @@ func (e *LinuxExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocD
return err
}
env.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName))
env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))
e.Cmd.Env = env.List()
e.alloc = alloc
@ -265,6 +266,14 @@ func (e *LinuxExecutor) Start() error {
return err
}
parsedPath, err := args.ParseAndReplace(e.cmd.Path, envVars.Map())
if err != nil {
return err
} else if len(parsedPath) != 1 {
return fmt.Errorf("couldn't properly parse command path: %v", e.cmd.Path)
}
e.cmd.Path = parsedPath[0]
combined := strings.Join(e.Cmd.Args, " ")
parsed, err := args.ParseAndReplace(combined, envVars.Map())
if err != nil {

View File

@ -6,8 +6,11 @@ import (
"fmt"
"os"
"strconv"
"strings"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/args"
"github.com/hashicorp/nomad/client/driver/environment"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -29,11 +32,37 @@ func (e *UniversalExecutor) Limit(resources *structs.Resources) error {
}
func (e *UniversalExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error {
// No-op
taskDir, ok := alloc.TaskDirs[taskName]
if !ok {
return fmt.Errorf("Error finding task dir for (%s)", taskName)
}
e.Dir = taskDir
return nil
}
func (e *UniversalExecutor) Start() error {
// Parse the commands arguments and replace instances of Nomad environment
// variables.
envVars, err := environment.ParseFromList(e.cmd.Env)
if err != nil {
return err
}
parsedPath, err := args.ParseAndReplace(e.cmd.Path, envVars.Map())
if err != nil {
return err
} else if len(parsedPath) != 1 {
return fmt.Errorf("couldn't properly parse command path: %v", e.cmd.Path)
}
e.cmd.Path = parsedPath[0]
combined := strings.Join(e.cmd.Args, " ")
parsed, err := args.ParseAndReplace(combined, envVars.Map())
if err != nil {
return err
}
e.Cmd.Args = parsed
// We don't want to call ourself. We want to call Start on our embedded Cmd
return e.cmd.Start()
}

1
website/.ruby-version Normal file
View File

@ -0,0 +1 @@
2.2.2

View File

@ -20,8 +20,10 @@ scripts or other wrappers which provide higher level features.
The `exec` driver supports the following configuration in the job spec:
* `command` - The command to execute. Must be provided.
* `command` - (Required) The command to execute. Must be provided.
* `artifact_source` (Optional) Source location of an executable artifact. Must be accessible
from the Nomad client. If you specify an `artifact_source` to be executed, you
must reference it in the `command` as show in the examples below
* `args` - The argument list to the command, space seperated. Optional.
## Client Requirements
@ -30,6 +32,30 @@ The `exec` driver can run on all supported operating systems but to provide
proper isolation the client must be run as root on non-Windows operating systems.
Further, to support cgroups, `/sys/fs/cgroups/` must be mounted.
You must specify a `command` to be executed. Optionally you can specify an
`artifact_source` to be downloaded as well. Any `command` is assumed to be present on the
running client, or a downloaded artifact.
## Examples
To run a binary present on the Node:
```
config {
command = "/bin/sleep"
args = 1
}
```
To execute a binary specified by `artifact_source`:
```
config {
artifact_source = "https://dl.dropboxusercontent.com/u/1234/binary.bin"
command = "$NOMAD_TASK_DIR/binary.bin"
}
```
## Client Attributes
The `exec` driver will set the following client attributes:

View File

@ -19,7 +19,7 @@ HTTP from the Nomad client.
The `java` driver supports the following configuration in the job spec:
* `jar_source` - **(Required)** The hosted location of the source Jar file. Must be accessible
from the Nomad client, via HTTP
from the Nomad client
* `args` - **(Optional)** The argument list for the `java` command, space separated.

View File

@ -18,8 +18,10 @@ As such, it should be used with extreme care and is disabled by default.
The `raw_exec` driver supports the following configuration in the job spec:
* `command` - The command to execute. Must be provided.
* `command` - (Required) The command to execute. Must be provided.
* `artifact_source` (Optional) Source location of an executable artifact. Must be accessible
from the Nomad client. If you specify an `artifact_source` to be executed, you
must reference it in the `command` as show in the examples below
* `args` - The argument list to the command, space seperated. Optional.
## Client Requirements
@ -35,6 +37,30 @@ options = {
}
```
You must specify a `command` to be executed. Optionally you can specify an
`artifact_source` to be executed. Any `command` is assumed to be present on the
running client, or a downloaded artifact
## Examples
To run a binary present on the Node:
```
config {
command = "/bin/sleep"
args = 1
}
```
To execute a binary specified by `artifact_source`:
```
config {
artifact_source = "https://dl.dropboxusercontent.com/u/1234/binary.bin"
command = "$NOMAD_TASK_DIR/binary.bin"
}
```
## Client Attributes
The `raw_exec` driver will set the following client attributes: