Merge pull request #921 from hashicorp/f-artifact-download

Allow downloading many artifacts and support unarchiving
This commit is contained in:
Alex Dadgar 2016-03-15 21:08:55 -07:00
commit 0fb03ff2db
35 changed files with 912 additions and 403 deletions

View File

@ -92,6 +92,13 @@ type Task struct {
Meta map[string]string
KillTimeout time.Duration
LogConfig *LogConfig
Artifacts []*TaskArtifact
}
// TaskArtifact is used to download artifacts before running a task.
type TaskArtifact struct {
GetterSource string
GetterOptions map[string]string
}
// NewTask creates and initializes a new Task.
@ -147,24 +154,27 @@ type TaskState struct {
}
const (
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
// appropriate to the events type.
type TaskEvent struct {
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
StartDelay int64
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
StartDelay int64
DownloadError string
}

View File

@ -1,6 +1,7 @@
package driver
import (
"io"
"log"
"math/rand"
"os"
@ -38,6 +39,31 @@ func TestMain(m *testing.M) {
}
}
// copyFile moves an existing file to the destination
func copyFile(src, dst string, t *testing.T) {
in, err := os.Open(src)
if err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
defer func() {
if err := out.Close(); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
}()
if _, err = io.Copy(out, in); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
if err := out.Sync(); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
return
}
func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
}

View File

@ -15,7 +15,6 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
@ -28,10 +27,8 @@ type ExecDriver struct {
}
type ExecDriverConfig struct {
ArtifactSource string `mapstructure:"artifact_source"`
Checksum string `mapstructure:"checksum"`
Command string `mapstructure:"command"`
Args []string `mapstructure:"args"`
Command string `mapstructure:"command"`
Args []string `mapstructure:"args"`
}
// execHandle is returned from Start/Open as a handle to the PID
@ -89,21 +86,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
_, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
}
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)

View File

@ -188,54 +188,6 @@ func TestExecDriver_Start_Wait(t *testing.T) {
}
}
func TestExecDriver_Start_Artifact_basic(t *testing.T) {
t.Parallel()
ctestutils.ExecCompatible(t)
file := "hi_linux_amd64"
checksum := "sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c"
task := &structs.Task{
Name: "sleep",
Config: map[string]interface{}{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s?checksum=%s", file, checksum),
"command": file,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}
// Task should terminate quickly
select {
case res := <-handle.WaitCh():
if !res.Successful() {
t.Fatalf("err: %v", res)
}
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
t.Fatalf("timeout")
}
}
func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
t.Parallel()
ctestutils.ExecCompatible(t)

View File

@ -5,7 +5,6 @@ import (
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
@ -151,11 +150,10 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
e.cmd.Env = ctx.TaskEnv.EnvList()
e.cmd.Path = ctx.TaskEnv.ReplaceEnv(command.Cmd)
e.cmd.Args = append([]string{e.cmd.Path}, ctx.TaskEnv.ParseAndReplace(command.Args)...)
if filepath.Base(command.Cmd) == command.Cmd {
if lp, err := exec.LookPath(command.Cmd); err != nil {
} else {
e.cmd.Path = lp
}
// Ensure that the binary being started is executable.
if err := e.makeExecutable(e.cmd.Path); err != nil {
return nil, err
}
// starting the process
@ -280,3 +278,24 @@ func (e *UniversalExecutor) configureTaskDir() error {
e.cmd.Dir = taskDir
return nil
}
// makeExecutablePosix makes the given file executable for root,group,others.
func (e *UniversalExecutor) makeExecutablePosix(binPath string) error {
fi, err := os.Stat(binPath)
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("binary %q does not exist", binPath)
}
return fmt.Errorf("specified binary is invalid: %v", err)
}
// If it is not executable, make it so.
perm := fi.Mode().Perm()
req := os.FileMode(0555)
if perm&req != req {
if err := os.Chmod(binPath, perm|req); err != nil {
return fmt.Errorf("error making %q executable: %s", binPath, err)
}
}
return nil
}

View File

@ -3,9 +3,25 @@
package executor
import (
"path/filepath"
"runtime"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
func (e *UniversalExecutor) makeExecutable(binPath string) error {
if runtime.GOOS == "windows" {
return nil
}
path := binPath
if !filepath.IsAbs(binPath) {
// The path must be relative the allocations directory.
path = filepath.Join(e.taskDir, binPath)
}
return e.makeExecutablePosix(path)
}
func (e *UniversalExecutor) configureChroot() error {
return nil
}

View File

@ -36,6 +36,18 @@ var (
}
)
func (e *UniversalExecutor) makeExecutable(binPath string) error {
path := binPath
if e.ctx.FSIsolation {
// The path must be relative the chroot
path = filepath.Join(e.taskDir, binPath)
} else if !filepath.IsAbs(binPath) {
// The path must be relative the allocations directory.
path = filepath.Join(e.taskDir, binPath)
}
return e.makeExecutablePosix(path)
}
// configureIsolation configures chroot and creates cgroups
func (e *UniversalExecutor) configureIsolation() error {
if e.ctx.FSIsolation {

View File

@ -21,7 +21,6 @@ import (
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -34,10 +33,9 @@ type JavaDriver struct {
}
type JavaDriverConfig struct {
JvmOpts []string `mapstructure:"jvm_options"`
ArtifactSource string `mapstructure:"artifact_source"`
Checksum string `mapstructure:"checksum"`
Args []string `mapstructure:"args"`
JarPath string `mapstructure:"jar_path"`
JvmOpts []string `mapstructure:"jvm_options"`
Args []string `mapstructure:"args"`
}
// javaHandle is returned from Start/Open as a handle to the PID
@ -124,19 +122,10 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
// Proceed to download an artifact to be executed.
path, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
if driverConfig.JarPath == "" {
return nil, fmt.Errorf("jar_path must be specified")
}
jarName := filepath.Base(path)
args := []string{}
// Look for jvm options
if len(driverConfig.JvmOpts) != 0 {
@ -145,7 +134,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
// Build the argument list.
args = append(args, "-jar", jarName)
args = append(args, "-jar", driverConfig.JarPath)
if len(driverConfig.Args) != 0 {
args = append(args, driverConfig.Args...)
}
@ -160,7 +149,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
execIntf, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
@ -175,7 +164,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
ResourceLimits: true,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx)
absPath, err := GetAbsolutePath("java")
if err != nil {
return nil, err
}
ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: args}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
@ -186,7 +180,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
maxKill := d.DriverContext.config.MaxKillTimeout
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
executor: execIntf,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,

View File

@ -58,9 +58,8 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
task := &structs.Task{
Name: "demo-app",
Config: map[string]interface{}{
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar",
"jvm_options": []string{"-Xmx64m", "-Xms32m"},
"checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8",
"jar_path": "demoapp.jar",
"jvm_options": []string{"-Xmx64m", "-Xms32m"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
@ -73,6 +72,10 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewJavaDriver(driverCtx)
// Copy the test jar into the task's directory
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -108,8 +111,7 @@ func TestJavaDriver_Start_Wait(t *testing.T) {
task := &structs.Task{
Name: "demo-app",
Config: map[string]interface{}{
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar",
"checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8",
"jar_path": "demoapp.jar",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
@ -119,9 +121,12 @@ func TestJavaDriver_Start_Wait(t *testing.T) {
}
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewJavaDriver(driverCtx)
// Copy the test jar into the task's directory
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -168,7 +173,7 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
task := &structs.Task{
Name: "demo-app",
Config: map[string]interface{}{
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar",
"jar_path": "demoapp.jar",
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
@ -181,6 +186,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewJavaDriver(driverCtx)
// Copy the test jar into the task's directory
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)

View File

@ -17,7 +17,6 @@ import (
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
@ -36,10 +35,9 @@ type QemuDriver struct {
}
type QemuDriverConfig struct {
ArtifactSource string `mapstructure:"artifact_source"`
Checksum string `mapstructure:"checksum"`
Accelerator string `mapstructure:"accelerator"`
PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports.
ImagePath string `mapstructure:"image_path"`
Accelerator string `mapstructure:"accelerator"`
PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports.
}
// qemuHandle is returned from Start/Open as a handle to the PID
@ -98,16 +96,11 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
// Get the image source
source, ok := task.Config["artifact_source"]
if !ok || source == "" {
return nil, fmt.Errorf("Missing source image Qemu driver")
}
// Qemu defaults to 128M of RAM for a given VM. Instead, we force users to
// supply a memory size in the tasks resources
if task.Resources == nil || task.Resources.MemoryMB == 0 {
return nil, fmt.Errorf("Missing required Task Resource: Memory")
vmPath := driverConfig.ImagePath
if vmPath == "" {
return nil, fmt.Errorf("image_path must be set")
}
vmID := filepath.Base(vmPath)
// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
@ -115,19 +108,6 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
// Proceed to download an artifact to be executed.
vmPath, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
}
vmID := filepath.Base(vmPath)
// Parse configuration arguments
// Create the base arguments
accelerator := "tcg"
@ -137,8 +117,13 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// TODO: Check a lower bounds, e.g. the default 128 of Qemu
mem := fmt.Sprintf("%dM", task.Resources.MemoryMB)
absPath, err := GetAbsolutePath("qemu-system-x86_64")
if err != nil {
return nil, err
}
args := []string{
"qemu-system-x86_64",
absPath,
"-machine", "type=pc,accel=" + accelerator,
"-name", vmID,
"-m", mem,

View File

@ -2,6 +2,7 @@ package driver
import (
"fmt"
"path/filepath"
"testing"
"github.com/hashicorp/nomad/client/config"
@ -37,13 +38,11 @@ func TestQemuDriver_Fingerprint(t *testing.T) {
func TestQemuDriver_StartOpen_Wait(t *testing.T) {
t.Parallel()
ctestutils.QemuCompatible(t)
// TODO: use test server to load from a fixture
task := &structs.Task{
Name: "linux",
Config: map[string]interface{}{
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/linux-0.2.img",
"checksum": "sha256:a5e836985934c3392cbbd9b26db55a7d35a8d7ae1deb7ca559dd9c0159572544",
"accelerator": "tcg",
"image_path": "linux-0.2.img",
"accelerator": "tcg",
"port_map": []map[string]int{{
"main": 22,
"web": 8080,
@ -68,6 +67,10 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) {
defer execCtx.AllocDir.Destroy()
d := NewQemuDriver(driverCtx)
// Copy the test image into the task's directory
dst, _ := execCtx.AllocDir.TaskDirs[task.Name]
copyFile("./test-resources/qemu/linux-0.2.img", filepath.Join(dst, "linux-0.2.img"), t)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
@ -90,33 +93,3 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) {
fmt.Printf("\nError killing Qemu test: %s", err)
}
}
func TestQemuDriver_RequiresMemory(t *testing.T) {
t.Parallel()
ctestutils.QemuCompatible(t)
// TODO: use test server to load from a fixture
task := &structs.Task{
Name: "linux",
Config: map[string]interface{}{
"artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/linux-0.2.img",
"accelerator": "tcg",
"host_port": "8080",
"guest_port": "8081",
"checksum": "sha256:a5e836985934c3392cbbd9b26db55a7d35a8d7ae1deb7ca559dd9c0159572544",
// ssh u/p would be here
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
}
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewQemuDriver(driverCtx)
_, err := d.Start(execCtx, task)
if err == nil {
t.Fatalf("Expected error when not specifying memory")
}
}

View File

@ -14,7 +14,6 @@ import (
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
@ -83,21 +82,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return nil, err
}
// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
_, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
}
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)

View File

@ -3,8 +3,6 @@ package driver
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"path/filepath"
"reflect"
"testing"
@ -99,57 +97,6 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) {
handle2.Kill()
}
func TestRawExecDriver_Start_Artifact_basic(t *testing.T) {
t.Parallel()
path := testtask.Path()
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir(path))))
defer ts.Close()
file := filepath.Base(path)
task := &structs.Task{
Name: "sleep",
Config: map[string]interface{}{
"artifact_source": fmt.Sprintf("%s/%s", ts.URL, file),
"command": file,
"args": []string{"sleep", "1s"},
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}
testtask.SetTaskEnv(task)
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewRawExecDriver(driverCtx)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
// Attempt to open
handle2, err := d.Open(execCtx, handle.ID())
if err != nil {
t.Fatalf("err: %v", err)
}
if handle2 == nil {
t.Fatalf("missing handle")
}
// Task should terminate quickly
select {
case <-handle2.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
t.Fatalf("timeout")
}
}
func TestRawExecDriver_Start_Wait(t *testing.T) {
t.Parallel()
task := &structs.Task{

View File

@ -228,7 +228,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
Cmd: exec.Command(bin, "executor", pluginLogFile),
}
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
execIntf, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
@ -241,7 +241,12 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
LogConfig: task.LogConfig,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "rkt", Args: cmdArgs}, executorCtx)
absPath, err := GetAbsolutePath("rkt")
if err != nil {
return nil, err
}
ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: cmdArgs}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
@ -251,7 +256,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
maxKill := d.DriverContext.config.MaxKillTimeout
h := &rktHandle{
pluginClient: pluginClient,
executor: exec,
executor: execIntf,
executorPid: ps.Pid,
allocDir: ctx.AllocDir,
logger: d.logger,

View File

@ -4,6 +4,8 @@ import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
@ -133,3 +135,14 @@ func GetKillTimeout(desired, max time.Duration) time.Duration {
return max
}
// GetAbsolutePath returns the absolute path of the passed binary by resolving
// it in the path and following symlinks.
func GetAbsolutePath(bin string) (string, error) {
lp, err := exec.LookPath(bin)
if err != nil {
return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err)
}
return filepath.EvalSymlinks(lp)
}

View File

@ -4,14 +4,10 @@ import (
"fmt"
"log"
"net/url"
"path"
"path/filepath"
"runtime"
"strings"
"sync"
"syscall"
gg "github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
@ -24,7 +20,7 @@ var (
supported = []string{"http", "https", "s3"}
)
// getClient returns a client that is suitable for Nomad.
// getClient returns a client that is suitable for Nomad downloading artifacts.
func getClient(src, dst string) *gg.Client {
lock.Lock()
defer lock.Unlock()
@ -42,36 +38,38 @@ func getClient(src, dst string) *gg.Client {
return &gg.Client{
Src: src,
Dst: dst,
Dir: false, // Only support a single file for now.
Mode: gg.ClientModeAny,
Getters: getters,
}
}
func GetArtifact(destDir, source, checksum string, logger *log.Logger) (string, error) {
if source == "" {
return "", fmt.Errorf("Source url is empty in Artifact Getter")
}
u, err := url.Parse(source)
// getGetterUrl returns the go-getter URL to download the artifact.
func getGetterUrl(artifact *structs.TaskArtifact) (string, error) {
u, err := url.Parse(artifact.GetterSource)
if err != nil {
return "", err
return "", fmt.Errorf("failed to parse source URL %q: %v", artifact.GetterSource, err)
}
// if checksum is seperate, apply to source
if checksum != "" {
source = strings.Join([]string{source, fmt.Sprintf("checksum=%s", checksum)}, "?")
logger.Printf("[DEBUG] client.getter: Applying checksum to Artifact Source URL, new url: %s", source)
// Build the url
q := u.Query()
for k, v := range artifact.GetterOptions {
q.Add(k, v)
}
artifactFile := filepath.Join(destDir, path.Base(u.Path))
if err := getClient(source, artifactFile).Get(); err != nil {
return "", fmt.Errorf("Error downloading artifact: %s", err)
}
// Add execution permissions to the newly downloaded artifact
if runtime.GOOS != "windows" {
if err := syscall.Chmod(artifactFile, 0755); err != nil {
logger.Printf("[ERR] driver.raw_exec: Error making artifact executable: %s", err)
}
}
return artifactFile, nil
u.RawQuery = q.Encode()
return u.String(), nil
}
// GetArtifact downloads an artifact into the specified destination directory.
func GetArtifact(artifact *structs.TaskArtifact, destDir string, logger *log.Logger) error {
url, err := getGetterUrl(artifact)
if err != nil {
return err
}
// Download the artifact
if err := getClient(url, destDir).Get(); err != nil {
return err
}
return nil
}

View File

@ -4,108 +4,149 @@ import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestGetArtifact_basic(t *testing.T) {
func TestGetArtifact_FileAndChecksum(t *testing.T) {
// Create the test server hosting the file to download
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("./test-fixtures/"))))
defer ts.Close()
logger := log.New(os.Stderr, "", log.LstdFlags)
// Create a temp directory to download into
destDir, err := ioutil.TempDir("", "nomad-test")
if err != nil {
t.Fatalf("failed to make temp directory: %v", err)
}
defer os.RemoveAll(destDir)
// TODO: Use http.TestServer to serve these files from fixtures dir
passing := []struct {
Source, Checksum string
}{
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_darwin_amd64",
"sha256:66aa0f05fc0cfcf1e5ed8cc5307b5df51e33871d6b295a60e0f9f6dd573da977",
},
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64",
"sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c",
},
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64",
"md5:a9b14903a8942748e4f8474e11f795d3",
},
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64?checksum=sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c",
"",
},
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64",
"",
// Create the artifact
file := "test.sh"
artifact := &structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, file),
GetterOptions: map[string]string{
"checksum": "md5:bce963762aa2dbfed13caf492a45fb72",
},
}
for i, p := range passing {
destDir, err := ioutil.TempDir("", fmt.Sprintf("nomad-test-%d", i))
if err != nil {
t.Fatalf("Error in TestGetArtifact_basic makeing TempDir: %s", err)
}
// Download the artifact
logger := log.New(os.Stderr, "", log.LstdFlags)
if err := GetArtifact(artifact, destDir, logger); err != nil {
t.Fatalf("GetArtifact failed: %v", err)
}
path, err := GetArtifact(destDir, p.Source, p.Checksum, logger)
if err != nil {
t.Fatalf("TestGetArtifact_basic unexpected failure here: %s", err)
}
// Verify artifact exists
if _, err := os.Stat(filepath.Join(destDir, file)); err != nil {
t.Fatalf("source path error: %s", err)
}
}
if p.Checksum != "" {
if ok := strings.Contains(path, p.Checksum); ok {
t.Fatalf("path result should not contain the checksum, got: %s", path)
func TestGetArtifact_InvalidChecksum(t *testing.T) {
// Create the test server hosting the file to download
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("./test-fixtures/"))))
defer ts.Close()
// Create a temp directory to download into
destDir, err := ioutil.TempDir("", "nomad-test")
if err != nil {
t.Fatalf("failed to make temp directory: %v", err)
}
defer os.RemoveAll(destDir)
// Create the artifact with an incorrect checksum
file := "test.sh"
artifact := &structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, file),
GetterOptions: map[string]string{
"checksum": "md5:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
},
}
// Download the artifact and expect an error
logger := log.New(os.Stderr, "", log.LstdFlags)
if err := GetArtifact(artifact, destDir, logger); err == nil {
t.Fatalf("GetArtifact should have failed")
}
}
func createContents(basedir string, fileContents map[string]string, t *testing.T) {
for relPath, content := range fileContents {
folder := basedir
if strings.Index(relPath, "/") != -1 {
// Create the folder.
folder = filepath.Join(basedir, filepath.Dir(relPath))
if err := os.Mkdir(folder, 0777); err != nil {
t.Fatalf("failed to make directory: %v", err)
}
}
// verify artifact exists
if _, err := os.Stat(path); err != nil {
t.Fatalf("source path error: %s", err)
// Create a file in the existing folder.
file := filepath.Join(folder, filepath.Base(relPath))
if err := ioutil.WriteFile(file, []byte(content), 0777); err != nil {
t.Fatalf("failed to write data to file %v: %v", file, err)
}
}
}
func TestGetArtifact_fails(t *testing.T) {
func checkContents(basedir string, fileContents map[string]string, t *testing.T) {
for relPath, content := range fileContents {
path := filepath.Join(basedir, relPath)
actual, err := ioutil.ReadFile(path)
if err != nil {
t.Fatalf("failed to read file %q: %v", path, err)
}
if !reflect.DeepEqual(actual, []byte(content)) {
t.Fatalf("%q: expected %q; got %q", path, content, string(actual))
}
}
}
func TestGetArtifact_Archive(t *testing.T) {
// Create the test server hosting the file to download
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("./test-fixtures/"))))
defer ts.Close()
// Create a temp directory to download into and create some of the same
// files that exist in the artifact to ensure they are overriden
destDir, err := ioutil.TempDir("", "nomad-test")
if err != nil {
t.Fatalf("failed to make temp directory: %v", err)
}
defer os.RemoveAll(destDir)
create := map[string]string{
"exist/my.config": "to be replaced",
"untouched": "existing top-level",
}
createContents(destDir, create, t)
file := "archive.tar.gz"
artifact := &structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, file),
GetterOptions: map[string]string{
"checksum": "sha1:20bab73c72c56490856f913cf594bad9a4d730f6",
},
}
logger := log.New(os.Stderr, "", log.LstdFlags)
failing := []struct {
Source, Checksum string
}{
{
"",
"sha256:66aa0f05fc0cfcf1e5ed8cc5307b5d",
},
{
"/u/47675/jar_thing/hi_darwin_amd64",
"sha256:66aa0f05fc0cfcf1e5ed8cc5307b5d",
},
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_darwin_amd64",
"sha256:66aa0f05fc0cfcf1e5ed8cc5307b5d",
},
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64",
"sha257:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c",
},
// malformed checksum
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64",
"6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c",
},
// 404
{
"https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd86",
"",
},
if err := GetArtifact(artifact, destDir, logger); err != nil {
t.Fatalf("GetArtifact failed: %v", err)
}
for i, p := range failing {
destDir, err := ioutil.TempDir("", fmt.Sprintf("nomad-test-%d", i))
if err != nil {
t.Fatalf("Error in TestGetArtifact_basic makeing TempDir: %s", err)
}
_, err = GetArtifact(destDir, p.Source, p.Checksum, logger)
if err == nil {
t.Fatalf("TestGetArtifact_basic expected failure, but got none")
}
// Verify the unarchiving overrode files properly.
expected := map[string]string{
"untouched": "existing top-level",
"exist/my.config": "hello world\n",
"new/my.config": "hello world\n",
"test.sh": "sleep 1\n",
}
checkContents(destDir, expected, t)
}

BIN
client/getter/test-fixtures/archive.tar.gz (Stored with Git LFS) Normal file

Binary file not shown.

View File

@ -0,0 +1 @@
hello world

View File

@ -0,0 +1 @@
hello world

View File

@ -0,0 +1 @@
sleep 1

View File

@ -0,0 +1 @@
sleep 1

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/hashstructure"
@ -48,6 +49,10 @@ type TaskRunner struct {
handle driver.DriverHandle
handleLock sync.Mutex
// artifactsDownloaded tracks whether the tasks artifacts have been
// downloaded
artifactsDownloaded bool
destroy bool
destroyCh chan struct{}
destroyLock sync.Mutex
@ -146,6 +151,10 @@ func (r *TaskRunner) RestoreState() error {
}
r.handleLock.Lock()
r.handle = handle
// If we have previously created the driver, the artifacts have been
// downloaded.
r.artifactsDownloaded = true
r.handleLock.Unlock()
}
return nil
@ -214,12 +223,40 @@ func (r *TaskRunner) Run() {
}
func (r *TaskRunner) run() {
// Predeclare things so we an jump to the RESTART
var handleEmpty bool
for {
// Download the task's artifacts
if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name]
if !ok {
err := fmt.Errorf("task directory couldn't be found")
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)
// Non-restartable error
return
}
for _, artifact := range r.task.Artifacts {
if err := getter.GetArtifact(artifact, taskDir, r.logger); err != nil {
r.setState(structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err))
r.restartTracker.SetStartError(cstructs.NewRecoverableError(err, true))
goto RESTART
}
}
r.artifactsDownloaded = true
}
// Start the task if not yet started or it is being forced. This logic
// is necessary because in the case of a restore the handle already
// exists.
r.handleLock.Lock()
handleEmpty := r.handle == nil
handleEmpty = r.handle == nil
r.handleLock.Unlock()
if handleEmpty {
startErr := r.startTask()
@ -277,6 +314,7 @@ func (r *TaskRunner) run() {
RESTART:
state, when := r.restartTracker.GetState()
r.restartTracker.SetStartError(nil).SetWaitResult(nil)
switch state {
case structs.TaskNotRestarting, structs.TaskTerminated:
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)

View File

@ -3,6 +3,8 @@ package client
import (
"fmt"
"log"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
@ -32,12 +34,17 @@ func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEve
}
func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
return testTaskRunnerFromAlloc(restarts, mock.Alloc())
}
// Creates a mock task runner using the first task in the first task group of
// the passed allocation.
func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTaskStateUpdater, *TaskRunner) {
logger := testLogger()
conf := DefaultConfig()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
upd := &MockTaskStateUpdater{}
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
// Initialize the port listing. This should be done by the offer process but
@ -48,7 +55,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
allocDir.Build([]*structs.Task{task})
ctx := driver.NewExecContext(allocDir, alloc.ID)
tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, consulClient)
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc, task, consulClient)
if !restarts {
tr.restartTracker = noRestartsTracker()
}
@ -227,3 +234,134 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_Download_List(t *testing.T) {
ctestutil.ExecCompatible(t)
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("."))))
defer ts.Close()
// Create an allocation that has a task with a list of artifacts.
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
f1 := "task_runner_test.go"
f2 := "task_runner.go"
artifact1 := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
}
artifact2 := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f2),
}
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}
upd, tr := testTaskRunnerFromAlloc(false, alloc)
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(upd.events) != 4 {
t.Fatalf("should have 4 updates: %#v", upd.events)
}
if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}
if upd.events[1].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDownloadingArtifacts)
}
if upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskStarted)
}
if upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskTerminated)
}
// Check that both files exist.
taskDir := tr.ctx.AllocDir.TaskDirs[task.Name]
if _, err := os.Stat(filepath.Join(taskDir, f1)); err != nil {
t.Fatalf("%v not downloaded", f1)
}
if _, err := os.Stat(filepath.Join(taskDir, f2)); err != nil {
t.Fatalf("%v not downloaded", f2)
}
}
func TestTaskRunner_Download_Retries(t *testing.T) {
ctestutil.ExecCompatible(t)
// Create an allocation that has a task with bad artifacts.
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
artifact := structs.TaskArtifact{
GetterSource: "http://127.1.1.111:12315/foo/bar/baz",
}
task.Artifacts = []*structs.TaskArtifact{&artifact}
// Make the restart policy try one update
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
Attempts: 1,
Interval: 10 * time.Minute,
Delay: 1 * time.Second,
Mode: structs.RestartPolicyModeFail,
}
upd, tr := testTaskRunnerFromAlloc(true, alloc)
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
select {
case <-tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(upd.events) != 7 {
t.Fatalf("should have 7 updates: %#v", upd.events)
}
if upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
}
if upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
}
if upd.events[1].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDownloadingArtifacts)
}
if upd.events[2].Type != structs.TaskArtifactDownloadFailed {
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskArtifactDownloadFailed)
}
if upd.events[3].Type != structs.TaskRestarting {
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskRestarting)
}
if upd.events[4].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskDownloadingArtifacts)
}
if upd.events[5].Type != structs.TaskArtifactDownloadFailed {
t.Fatalf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskArtifactDownloadFailed)
}
if upd.events[6].Type != structs.TaskNotRestarting {
t.Fatalf("Seventh Event was %v; want %v", upd.events[6].Type, structs.TaskNotRestarting)
}
}

View File

@ -205,6 +205,14 @@ func (c *AllocStatusCommand) taskStatus(alloc *api.Allocation) {
} else {
desc = "Failed to start task"
}
case api.TaskDownloadingArtifacts:
desc = "Client is downloading artifacts"
case api.TaskArtifactDownloadFailed:
if event.DownloadError != "" {
desc = event.DownloadError
} else {
desc = "Failed to download artifacts"
}
case api.TaskKilled:
if event.KillError != "" {
desc = event.KillError

View File

@ -158,6 +158,16 @@ job "example" {
}
}
}
# The artifact block can be specified one or more times to download
$ artifacts prior to the task being started. This is convenient for
$ shipping configs or data needed by the task.
# artifact {
# source = "http://foo.com/artifact.tar.gz"
# options {
# checksum = "md5:c4aa853ad2215426eb7d70a21922e794"
# }
# }
# Specify configuration related to log rotation
# logs {

View File

@ -468,6 +468,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
"resources",
"logs",
"kill_timeout",
"artifact",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
@ -484,6 +485,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
delete(m, "meta")
delete(m, "resources")
delete(m, "logs")
delete(m, "artifact")
// Build the task
var t structs.Task
@ -596,12 +598,84 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
}
t.LogConfig = logConfig
// Parse artifacts
if o := listVal.Filter("artifact"); len(o.Items) > 0 {
if err := parseArtifacts(&t.Artifacts, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', artifact ->", n))
}
}
*result = append(*result, &t)
}
return nil
}
func parseArtifacts(result *[]*structs.TaskArtifact, list *ast.ObjectList) error {
for _, o := range list.Elem().Items {
// Check for invalid keys
valid := []string{
"source",
"options",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
delete(m, "options")
var ta structs.TaskArtifact
if err := mapstructure.WeakDecode(m, &ta); err != nil {
return err
}
var optionList *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
optionList = ot.List
} else {
return fmt.Errorf("artifact should be an object")
}
options := make(map[string]string)
if oo := optionList.Filter("options"); len(oo.Items) > 0 {
if err := parseArtifactOption(options, oo); err != nil {
return multierror.Prefix(err, "options: ")
}
}
ta.GetterOptions = options
*result = append(*result, &ta)
}
return nil
}
func parseArtifactOption(result map[string]string, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'options' block allowed per artifact")
}
// Get our resource object
o := list.Items[0]
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &result); err != nil {
return err
}
return nil
}
func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error {
task.Services = make([]*structs.Service, len(serviceObjs.Items))
var defaultServiceName bool

View File

@ -128,6 +128,20 @@ func TestParse(t *testing.T) {
MaxFiles: 10,
MaxFileSizeMB: 100,
},
Artifacts: []*structs.TaskArtifact{
{
GetterSource: "http://foo.com/artifact",
GetterOptions: map[string]string{
"checksum": "md5:b8a4f3f72ecab0510a6a31e997461c5f",
},
},
{
GetterSource: "http://bar.com/artifact",
GetterOptions: map[string]string{
"checksum": "md5:ff1cc0d3432dad54d607c1505fb7245c",
},
},
},
},
&structs.Task{
Name: "storagelocker",
@ -301,6 +315,11 @@ func TestParse(t *testing.T) {
},
false,
},
{
"bad-artifact.hcl",
nil,
true,
},
}
for _, tc := range cases {

View File

@ -0,0 +1,13 @@
job "binstore-storagelocker" {
group "binsl" {
count = 5
task "binstore" {
driver = "docker"
artifact {
bad = "bad"
}
resources {}
}
}
}

View File

@ -82,6 +82,20 @@ job "binstore-storagelocker" {
}
kill_timeout = "22s"
artifact {
source = "http://foo.com/artifact"
options {
checksum = "md5:b8a4f3f72ecab0510a6a31e997461c5f"
}
}
artifact {
source = "http://bar.com/artifact"
options {
checksum = "md5:ff1cc0d3432dad54d607c1505fb7245c"
}
}
}
task "storagelocker" {

View File

@ -2,10 +2,15 @@ package structs
import (
"bytes"
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"crypto/sha512"
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"reflect"
"regexp"
"strconv"
@ -1603,6 +1608,10 @@ type Task struct {
// LogConfig provides configuration for log rotation
LogConfig *LogConfig `mapstructure:"logs"`
// Artifacts is a list of artifacts to download and extract before running
// the task.
Artifacts []*TaskArtifact
}
func (t *Task) Copy() *Task {
@ -1623,6 +1632,12 @@ func (t *Task) Copy() *Task {
nt.Resources = nt.Resources.Copy()
nt.Meta = CopyMapStringString(nt.Meta)
artifacts := make([]*TaskArtifact, len(nt.Artifacts))
for i, a := range nt.Artifacts {
artifacts[i] = a.Copy()
}
nt.Artifacts = artifacts
if i, err := copystructure.Copy(nt.Config); err != nil {
nt.Config = i.(map[string]interface{})
}
@ -1662,6 +1677,72 @@ func (t *Task) FindHostAndPortFor(portLabel string) (string, int) {
return "", 0
}
// Validate is used to sanity check a task
func (t *Task) Validate() error {
var mErr multierror.Error
if t.Name == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task name"))
}
if t.Driver == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task driver"))
}
if t.KillTimeout.Nanoseconds() < 0 {
mErr.Errors = append(mErr.Errors, errors.New("KillTimeout must be a positive value"))
}
// Validate the resources.
if t.Resources == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing task resources"))
} else if err := t.Resources.MeetsMinResources(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// Validate the log config
if t.LogConfig == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config"))
} else if err := t.LogConfig.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
for idx, constr := range t.Constraints {
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
for _, service := range t.Services {
if err := service.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
if t.LogConfig != nil && t.Resources != nil {
logUsage := (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB)
if t.Resources.DiskMB <= logUsage {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("log storage (%d MB) exceeds requested disk capacity (%d MB)",
logUsage, t.Resources.DiskMB))
}
}
for idx, artifact := range t.Artifacts {
if err := artifact.Validate(); err != nil {
outer := fmt.Errorf("Artifact %d validation failed: %v", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
// If the driver is java or qemu ensure that they have specified an
// artifact.
if (t.Driver == "qemu" || t.Driver == "java") && len(t.Artifacts) == 0 {
err := fmt.Errorf("must specify at least one artifact when using %q driver", t.Driver)
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
}
// Set of possible states for a task.
const (
TaskStatePending = "pending" // The task is waiting to be run.
@ -1727,6 +1808,14 @@ const (
// TaskNotRestarting indicates that the task has failed and is not being
// restarted because it has exceeded its restart policy.
TaskNotRestarting = "Restarts Exceeded"
// Task Downloading Artifacts means the task is downloading the artifacts
// specified in the task.
TaskDownloadingArtifacts = "Downloading Artifacts"
// TaskArtifactDownloadFailed indicates that downloading the artifacts
// failed.
TaskArtifactDownloadFailed = "Failed Artifact Download"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@ -1748,6 +1837,9 @@ type TaskEvent struct {
// TaskRestarting fields.
StartDelay int64 // The sleep period before restarting the task in unix nanoseconds.
// Artifact Download fields
DownloadError string // Error downloading artifacts
}
func (te *TaskEvent) GoString() string {
@ -1806,54 +1898,88 @@ func (e *TaskEvent) SetRestartDelay(delay time.Duration) *TaskEvent {
return e
}
// Validate is used to sanity check a task group
func (t *Task) Validate() error {
func (e *TaskEvent) SetDownloadError(err error) *TaskEvent {
if err != nil {
e.DownloadError = err.Error()
}
return e
}
// TaskArtifact is an artifact to download before running the task.
type TaskArtifact struct {
// GetterSource is the source to download an artifact using go-getter
GetterSource string `mapstructure:"source"`
// GetterOptions are options to use when downloading the artifact using
// go-getter.
GetterOptions map[string]string `mapstructure:"options"`
}
func (ta *TaskArtifact) Copy() *TaskArtifact {
if ta == nil {
return nil
}
nta := new(TaskArtifact)
*nta = *ta
nta.GetterOptions = CopyMapStringString(ta.GetterOptions)
return nta
}
func (ta *TaskArtifact) Validate() error {
// Verify the source
var mErr multierror.Error
if t.Name == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task name"))
}
if t.Driver == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task driver"))
}
if t.KillTimeout.Nanoseconds() < 0 {
mErr.Errors = append(mErr.Errors, errors.New("KillTimeout must be a positive value"))
if ta.GetterSource == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("source must be specified"))
}
// Validate the resources.
if t.Resources == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing task resources"))
} else if err := t.Resources.MeetsMinResources(); err != nil {
mErr.Errors = append(mErr.Errors, err)
_, err := url.Parse(ta.GetterSource)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid source URL %q: %v", ta.GetterSource, err))
}
// Validate the log config
if t.LogConfig == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config"))
} else if err := t.LogConfig.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// Verify the checksum
if check, ok := ta.GetterOptions["checksum"]; ok {
check = strings.TrimSpace(check)
if check == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("checksum value can not be empty"))
return mErr.ErrorOrNil()
}
for idx, constr := range t.Constraints {
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
parts := strings.Split(check, ":")
if l := len(parts); l != 2 {
mErr.Errors = append(mErr.Errors, fmt.Errorf(`checksum must be given as "type:value"; got %q`, check))
return mErr.ErrorOrNil()
}
checksumVal := parts[1]
checksumBytes, err := hex.DecodeString(checksumVal)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid checksum: %v", err))
return mErr.ErrorOrNil()
}
checksumType := parts[0]
expectedLength := 0
switch checksumType {
case "md5":
expectedLength = md5.Size
case "sha1":
expectedLength = sha1.Size
case "sha256":
expectedLength = sha256.Size
case "sha512":
expectedLength = sha512.Size
default:
mErr.Errors = append(mErr.Errors, fmt.Errorf("unsupported checksum type: %s", checksumType))
return mErr.ErrorOrNil()
}
if len(checksumBytes) != expectedLength {
mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid %s checksum: %v", checksumType, checksumVal))
return mErr.ErrorOrNil()
}
}
for _, service := range t.Services {
if err := service.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
if t.LogConfig != nil && t.Resources != nil {
logUsage := (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB)
if t.Resources.DiskMB <= logUsage {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("log storage (%d MB) exceeds requested disk capacity (%d MB)",
logUsage, t.Resources.DiskMB))
}
}
return mErr.ErrorOrNil()
}

View File

@ -131,6 +131,11 @@ func testJob() *Job {
Env: map[string]string{
"FOO": "bar",
},
Artifacts: []*TaskArtifact{
{
GetterSource: "http://foo.com",
},
},
Services: []*Service{
{
Name: "${TASK}-frontend",
@ -763,3 +768,53 @@ func TestAllocation_Index(t *testing.T) {
t.Fatal()
}
}
func TestTaskArtifact_Validate_Source(t *testing.T) {
valid := &TaskArtifact{GetterSource: "google.com"}
if err := valid.Validate(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestTaskArtifact_Validate_Checksum(t *testing.T) {
cases := []struct {
Input *TaskArtifact
Err bool
}{
{
&TaskArtifact{
GetterSource: "foo.com",
GetterOptions: map[string]string{
"checksum": "no-type",
},
},
true,
},
{
&TaskArtifact{
GetterSource: "foo.com",
GetterOptions: map[string]string{
"checksum": "md5:toosmall",
},
},
true,
},
{
&TaskArtifact{
GetterSource: "foo.com",
GetterOptions: map[string]string{
"checksum": "invalid:type",
},
},
true,
},
}
for i, tc := range cases {
err := tc.Input.Validate()
if (err != nil) != tc.Err {
t.Fatalf("case %d: %v", i, err)
continue
}
}
}

View File

@ -299,6 +299,9 @@ func tasksUpdated(a, b *structs.TaskGroup) bool {
if !reflect.DeepEqual(at.Env, bt.Env) {
return true
}
if !reflect.DeepEqual(at.Artifacts, bt.Artifacts) {
return true
}
// Inspect the network to see if the dynamic ports are different
if len(at.Resources.Networks) != len(bt.Resources.Networks) {

View File

@ -10,4 +10,4 @@ go build -o $TEMPDIR/nomad || exit 1
# Run the tests
echo "--> Running tests"
go list ./... | grep -v '/vendor/' | sudo -E PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=180s
go list ./... | grep -v '/vendor/' | sudo -E PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=300s

View File

@ -268,6 +268,10 @@ The `task` object supports the following keys:
* `logs` - Logs allows configuring log rotation for the `stdout` and `stderr`
buffers of a Task. See the log rotation reference below for more details.
* `artifact` - Defines an artifact to be downloaded before the task is run. This
can be provided multiple times to define additional artifacts to download. See
the artifacts reference for more details.
### Resources
The `resources` object supports the following keys:
@ -405,6 +409,40 @@ In the above example we have asked Nomad to retain 3 rotated files for both
`stderr` and `stdout` and size of each file is 10MB. The minimum disk space that
would be required for the task would be 60MB.
### Artifact
Nomad downloads artifacts using
[`go-getter`](https://github.com/hashicorp/go-getter). The `go-getter` library
allows downloading of artifacts from various sources using a URL as the input
source. The key/value pairs given in the `options` block map directly to
parameters appended to the supplied `source` url. These are then used by
`go-getter` to appropriately download the artifact. `go-getter` also has a CLI
tool to validate its URL and can be used to check if the Nomad `artifact` is
valid.
Nomad allows downloading `http`, `https`, and `S3` artifacts. If these artifacts
are archives (zip, tar.gz, bz2, etc.), these will be unarchived before the task
is started.
The `artifact` object maps supports the following keys:
* `source` - The path to the artifact to download.
* `options` - The `options` block allows setting parameters for `go-getter`. An
example is given below:
```
options {
# Validate the downloaded artifact
checksum = "md5:c4aa853ad2215426eb7d70a21922e794"
# S3 options for downloading artifacts from S3
aws_access_key_id = "<id>"
aws_access_key_secret = "<secret>"
aws_access_token = "<token>"
}
```
## JSON Syntax
Job files can also be specified in JSON. The conversion is straightforward