Download artifacts and remove old code for drivers
This commit is contained in:
parent
081556db93
commit
9f878a16bf
33
api/tasks.go
33
api/tasks.go
|
@ -153,24 +153,27 @@ type TaskState struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
TaskDriverFailure = "Driver Failure"
|
TaskDriverFailure = "Driver Failure"
|
||||||
TaskReceived = "Received"
|
TaskReceived = "Received"
|
||||||
TaskStarted = "Started"
|
TaskStarted = "Started"
|
||||||
TaskTerminated = "Terminated"
|
TaskTerminated = "Terminated"
|
||||||
TaskKilled = "Killed"
|
TaskKilled = "Killed"
|
||||||
TaskRestarting = "Restarting"
|
TaskRestarting = "Restarting"
|
||||||
TaskNotRestarting = "Restarts Exceeded"
|
TaskNotRestarting = "Restarts Exceeded"
|
||||||
|
TaskDownloadingArtifacts = "Downloading Artifacts"
|
||||||
|
TaskArtifactDownloadFailed = "Failed Artifact Download"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||||
// appropriate to the events type.
|
// appropriate to the events type.
|
||||||
type TaskEvent struct {
|
type TaskEvent struct {
|
||||||
Type string
|
Type string
|
||||||
Time int64
|
Time int64
|
||||||
DriverError string
|
DriverError string
|
||||||
ExitCode int
|
ExitCode int
|
||||||
Signal int
|
Signal int
|
||||||
Message string
|
Message string
|
||||||
KillError string
|
KillError string
|
||||||
StartDelay int64
|
StartDelay int64
|
||||||
|
DownloadError string
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,6 @@ import (
|
||||||
"github.com/hashicorp/nomad/client/config"
|
"github.com/hashicorp/nomad/client/config"
|
||||||
"github.com/hashicorp/nomad/client/driver/executor"
|
"github.com/hashicorp/nomad/client/driver/executor"
|
||||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||||
"github.com/hashicorp/nomad/client/getter"
|
|
||||||
"github.com/hashicorp/nomad/helper/discover"
|
"github.com/hashicorp/nomad/helper/discover"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
|
@ -28,10 +27,8 @@ type ExecDriver struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExecDriverConfig struct {
|
type ExecDriverConfig struct {
|
||||||
ArtifactSource string `mapstructure:"artifact_source"`
|
Command string `mapstructure:"command"`
|
||||||
Checksum string `mapstructure:"checksum"`
|
Args []string `mapstructure:"args"`
|
||||||
Command string `mapstructure:"command"`
|
|
||||||
Args []string `mapstructure:"args"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// execHandle is returned from Start/Open as a handle to the PID
|
// execHandle is returned from Start/Open as a handle to the PID
|
||||||
|
@ -89,21 +86,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||||
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if an artificat is specified and attempt to download it
|
|
||||||
source, ok := task.Config["artifact_source"]
|
|
||||||
if ok && source != "" {
|
|
||||||
// Proceed to download an artifact to be executed.
|
|
||||||
_, err := getter.GetArtifact(
|
|
||||||
taskDir,
|
|
||||||
driverConfig.ArtifactSource,
|
|
||||||
driverConfig.Checksum,
|
|
||||||
d.logger,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bin, err := discover.NomadExecutable()
|
bin, err := discover.NomadExecutable()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
|
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/hashicorp/nomad/client/driver/executor"
|
"github.com/hashicorp/nomad/client/driver/executor"
|
||||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||||
"github.com/hashicorp/nomad/client/fingerprint"
|
"github.com/hashicorp/nomad/client/fingerprint"
|
||||||
"github.com/hashicorp/nomad/client/getter"
|
|
||||||
"github.com/hashicorp/nomad/helper/discover"
|
"github.com/hashicorp/nomad/helper/discover"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
)
|
)
|
||||||
|
@ -34,10 +33,9 @@ type JavaDriver struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type JavaDriverConfig struct {
|
type JavaDriverConfig struct {
|
||||||
JvmOpts []string `mapstructure:"jvm_options"`
|
JarPath string `mapstructure:"jar_path"`
|
||||||
ArtifactSource string `mapstructure:"artifact_source"`
|
JvmOpts []string `mapstructure:"jvm_options"`
|
||||||
Checksum string `mapstructure:"checksum"`
|
Args []string `mapstructure:"args"`
|
||||||
Args []string `mapstructure:"args"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// javaHandle is returned from Start/Open as a handle to the PID
|
// javaHandle is returned from Start/Open as a handle to the PID
|
||||||
|
@ -124,19 +122,10 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||||
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proceed to download an artifact to be executed.
|
if driverConfig.JarPath == "" {
|
||||||
path, err := getter.GetArtifact(
|
return nil, fmt.Errorf("jar_path must be specified")
|
||||||
taskDir,
|
|
||||||
driverConfig.ArtifactSource,
|
|
||||||
driverConfig.Checksum,
|
|
||||||
d.logger,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
jarName := filepath.Base(path)
|
|
||||||
|
|
||||||
args := []string{}
|
args := []string{}
|
||||||
// Look for jvm options
|
// Look for jvm options
|
||||||
if len(driverConfig.JvmOpts) != 0 {
|
if len(driverConfig.JvmOpts) != 0 {
|
||||||
|
@ -145,7 +134,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the argument list.
|
// Build the argument list.
|
||||||
args = append(args, "-jar", jarName)
|
args = append(args, "-jar", driverConfig.JarPath)
|
||||||
if len(driverConfig.Args) != 0 {
|
if len(driverConfig.Args) != 0 {
|
||||||
args = append(args, driverConfig.Args...)
|
args = append(args, driverConfig.Args...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@ import (
|
||||||
"github.com/hashicorp/nomad/client/driver/executor"
|
"github.com/hashicorp/nomad/client/driver/executor"
|
||||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||||
"github.com/hashicorp/nomad/client/fingerprint"
|
"github.com/hashicorp/nomad/client/fingerprint"
|
||||||
"github.com/hashicorp/nomad/client/getter"
|
|
||||||
"github.com/hashicorp/nomad/helper/discover"
|
"github.com/hashicorp/nomad/helper/discover"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
|
@ -36,10 +35,9 @@ type QemuDriver struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type QemuDriverConfig struct {
|
type QemuDriverConfig struct {
|
||||||
ArtifactSource string `mapstructure:"artifact_source"`
|
ImagePath string `mapstructure:"image_path"`
|
||||||
Checksum string `mapstructure:"checksum"`
|
Accelerator string `mapstructure:"accelerator"`
|
||||||
Accelerator string `mapstructure:"accelerator"`
|
PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports.
|
||||||
PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// qemuHandle is returned from Start/Open as a handle to the PID
|
// qemuHandle is returned from Start/Open as a handle to the PID
|
||||||
|
@ -98,36 +96,19 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the image source
|
// Get the image source
|
||||||
source, ok := task.Config["artifact_source"]
|
vmPath := driverConfig.ImagePath
|
||||||
if !ok || source == "" {
|
if vmPath == "" {
|
||||||
return nil, fmt.Errorf("Missing source image Qemu driver")
|
return nil, fmt.Errorf("image_path must be set")
|
||||||
}
|
|
||||||
|
|
||||||
// Qemu defaults to 128M of RAM for a given VM. Instead, we force users to
|
|
||||||
// supply a memory size in the tasks resources
|
|
||||||
if task.Resources == nil || task.Resources.MemoryMB == 0 {
|
|
||||||
return nil, fmt.Errorf("Missing required Task Resource: Memory")
|
|
||||||
}
|
}
|
||||||
|
vmID := filepath.Base(vmPath)
|
||||||
|
|
||||||
// Get the tasks local directory.
|
// Get the tasks local directory.
|
||||||
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
|
taskName := d.DriverContext.taskName
|
||||||
|
taskDir, ok := ctx.AllocDir.TaskDirs[taskName]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proceed to download an artifact to be executed.
|
|
||||||
vmPath, err := getter.GetArtifact(
|
|
||||||
taskDir,
|
|
||||||
driverConfig.ArtifactSource,
|
|
||||||
driverConfig.Checksum,
|
|
||||||
d.logger,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
vmID := filepath.Base(vmPath)
|
|
||||||
|
|
||||||
// Parse configuration arguments
|
// Parse configuration arguments
|
||||||
// Create the base arguments
|
// Create the base arguments
|
||||||
accelerator := "tcg"
|
accelerator := "tcg"
|
||||||
|
|
|
@ -14,7 +14,6 @@ import (
|
||||||
"github.com/hashicorp/nomad/client/driver/executor"
|
"github.com/hashicorp/nomad/client/driver/executor"
|
||||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||||
"github.com/hashicorp/nomad/client/fingerprint"
|
"github.com/hashicorp/nomad/client/fingerprint"
|
||||||
"github.com/hashicorp/nomad/client/getter"
|
|
||||||
"github.com/hashicorp/nomad/helper/discover"
|
"github.com/hashicorp/nomad/helper/discover"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
|
@ -83,21 +82,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if an artificat is specified and attempt to download it
|
|
||||||
source, ok := task.Config["artifact_source"]
|
|
||||||
if ok && source != "" {
|
|
||||||
// Proceed to download an artifact to be executed.
|
|
||||||
_, err := getter.GetArtifact(
|
|
||||||
taskDir,
|
|
||||||
driverConfig.ArtifactSource,
|
|
||||||
driverConfig.Checksum,
|
|
||||||
d.logger,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bin, err := discover.NomadExecutable()
|
bin, err := discover.NomadExecutable()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
|
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
|
||||||
|
|
|
@ -68,7 +68,7 @@ func GetArtifact(artifact *structs.TaskArtifact, destDir string, logger *log.Log
|
||||||
|
|
||||||
// Download the artifact
|
// Download the artifact
|
||||||
if err := getClient(url, destDir).Get(); err != nil {
|
if err := getClient(url, destDir).Get(); err != nil {
|
||||||
return fmt.Errorf("error downloading artifact (url: %q): %v", url, err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/nomad/client/config"
|
"github.com/hashicorp/nomad/client/config"
|
||||||
"github.com/hashicorp/nomad/client/driver"
|
"github.com/hashicorp/nomad/client/driver"
|
||||||
|
"github.com/hashicorp/nomad/client/getter"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/mitchellh/hashstructure"
|
"github.com/mitchellh/hashstructure"
|
||||||
|
|
||||||
|
@ -48,6 +49,10 @@ type TaskRunner struct {
|
||||||
handle driver.DriverHandle
|
handle driver.DriverHandle
|
||||||
handleLock sync.Mutex
|
handleLock sync.Mutex
|
||||||
|
|
||||||
|
// artifactsDownloaded tracks whether the tasks artifacts have been
|
||||||
|
// downloaded
|
||||||
|
artifactsDownloaded bool
|
||||||
|
|
||||||
destroy bool
|
destroy bool
|
||||||
destroyCh chan struct{}
|
destroyCh chan struct{}
|
||||||
destroyLock sync.Mutex
|
destroyLock sync.Mutex
|
||||||
|
@ -146,6 +151,10 @@ func (r *TaskRunner) RestoreState() error {
|
||||||
}
|
}
|
||||||
r.handleLock.Lock()
|
r.handleLock.Lock()
|
||||||
r.handle = handle
|
r.handle = handle
|
||||||
|
|
||||||
|
// If we have previously created the driver, the artifacts have been
|
||||||
|
// downloaded.
|
||||||
|
r.artifactsDownloaded = true
|
||||||
r.handleLock.Unlock()
|
r.handleLock.Unlock()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -214,12 +223,40 @@ func (r *TaskRunner) Run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *TaskRunner) run() {
|
func (r *TaskRunner) run() {
|
||||||
|
// Predeclare things so we an jump to the RESTART
|
||||||
|
var handleEmpty bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
// Download the task's artifacts
|
||||||
|
if !r.artifactsDownloaded {
|
||||||
|
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
|
||||||
|
taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name]
|
||||||
|
if !ok {
|
||||||
|
err := fmt.Errorf("task directory couldn't be found")
|
||||||
|
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||||
|
r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name)
|
||||||
|
|
||||||
|
// Non-restartable error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, artifact := range r.task.Artifacts {
|
||||||
|
if err := getter.GetArtifact(artifact, taskDir, r.logger); err != nil {
|
||||||
|
r.setState(structs.TaskStateDead,
|
||||||
|
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err))
|
||||||
|
r.restartTracker.SetStartError(cstructs.NewRecoverableError(err, true))
|
||||||
|
goto RESTART
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r.artifactsDownloaded = true
|
||||||
|
}
|
||||||
|
|
||||||
// Start the task if not yet started or it is being forced. This logic
|
// Start the task if not yet started or it is being forced. This logic
|
||||||
// is necessary because in the case of a restore the handle already
|
// is necessary because in the case of a restore the handle already
|
||||||
// exists.
|
// exists.
|
||||||
r.handleLock.Lock()
|
r.handleLock.Lock()
|
||||||
handleEmpty := r.handle == nil
|
handleEmpty = r.handle == nil
|
||||||
r.handleLock.Unlock()
|
r.handleLock.Unlock()
|
||||||
if handleEmpty {
|
if handleEmpty {
|
||||||
startErr := r.startTask()
|
startErr := r.startTask()
|
||||||
|
|
|
@ -205,6 +205,14 @@ func (c *AllocStatusCommand) taskStatus(alloc *api.Allocation) {
|
||||||
} else {
|
} else {
|
||||||
desc = "Failed to start task"
|
desc = "Failed to start task"
|
||||||
}
|
}
|
||||||
|
case api.TaskDownloadingArtifacts:
|
||||||
|
desc = "Client is downloading artifacts"
|
||||||
|
case api.TaskArtifactDownloadFailed:
|
||||||
|
if event.DownloadError != "" {
|
||||||
|
desc = event.DownloadError
|
||||||
|
} else {
|
||||||
|
desc = "Failed to download artifacts"
|
||||||
|
}
|
||||||
case api.TaskKilled:
|
case api.TaskKilled:
|
||||||
if event.KillError != "" {
|
if event.KillError != "" {
|
||||||
desc = event.KillError
|
desc = event.KillError
|
||||||
|
|
|
@ -1801,6 +1801,14 @@ const (
|
||||||
// TaskNotRestarting indicates that the task has failed and is not being
|
// TaskNotRestarting indicates that the task has failed and is not being
|
||||||
// restarted because it has exceeded its restart policy.
|
// restarted because it has exceeded its restart policy.
|
||||||
TaskNotRestarting = "Restarts Exceeded"
|
TaskNotRestarting = "Restarts Exceeded"
|
||||||
|
|
||||||
|
// Task Downloading Artifacts means the task is downloading the artifacts
|
||||||
|
// specified in the task.
|
||||||
|
TaskDownloadingArtifacts = "Downloading Artifacts"
|
||||||
|
|
||||||
|
// TaskArtifactDownloadFailed indicates that downloading the artifacts
|
||||||
|
// failed.
|
||||||
|
TaskArtifactDownloadFailed = "Failed Artifact Download"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||||
|
@ -1822,6 +1830,9 @@ type TaskEvent struct {
|
||||||
|
|
||||||
// TaskRestarting fields.
|
// TaskRestarting fields.
|
||||||
StartDelay int64 // The sleep period before restarting the task in unix nanoseconds.
|
StartDelay int64 // The sleep period before restarting the task in unix nanoseconds.
|
||||||
|
|
||||||
|
// Artifact Download fields
|
||||||
|
DownloadError string // Error downloading artifacts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (te *TaskEvent) GoString() string {
|
func (te *TaskEvent) GoString() string {
|
||||||
|
@ -1880,6 +1891,13 @@ func (e *TaskEvent) SetRestartDelay(delay time.Duration) *TaskEvent {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *TaskEvent) SetDownloadError(err error) *TaskEvent {
|
||||||
|
if err != nil {
|
||||||
|
e.DownloadError = err.Error()
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
// TaskArtifact is an artifact to download before running the task.
|
// TaskArtifact is an artifact to download before running the task.
|
||||||
type TaskArtifact struct {
|
type TaskArtifact struct {
|
||||||
// GetterSource is the source to download an artifact using go-getter
|
// GetterSource is the source to download an artifact using go-getter
|
||||||
|
|
Loading…
Reference in a new issue