diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 1846403ac..6df5fe64f 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -55,26 +55,26 @@ const ( // TaskEnvironment is used to expose information to a task via environment // variables and provide interpolation of Nomad variables. type TaskEnvironment struct { - env map[string]string - meta map[string]string - allocDir string - taskDir string - cpuLimit int - memLimit int - node *structs.Node - networks []*structs.NetworkResource - portMap map[string]int + Env map[string]string + Meta map[string]string + AllocDir string + TaskDir string + CpuLimit int + MemLimit int + Node *structs.Node + Networks []*structs.NetworkResource + PortMap map[string]int // taskEnv is the variables that will be set in the tasks environment - taskEnv map[string]string + TaskEnv map[string]string // nodeValues is the values that are allowed for interprolation from the // node. - nodeValues map[string]string + NodeValues map[string]string } func NewTaskEnvironment(node *structs.Node) *TaskEnvironment { - return &TaskEnvironment{node: node} + return &TaskEnvironment{Node: node} } // ParseAndReplace takes the user supplied args replaces any instance of an @@ -82,7 +82,7 @@ func NewTaskEnvironment(node *structs.Node) *TaskEnvironment { func (t *TaskEnvironment) ParseAndReplace(args []string) []string { replaced := make([]string, len(args)) for i, arg := range args { - replaced[i] = hargs.ReplaceEnv(arg, t.taskEnv, t.nodeValues) + replaced[i] = hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues) } return replaced @@ -92,75 +92,75 @@ func (t *TaskEnvironment) ParseAndReplace(args []string) []string { // and nomad variables. If the variable is found in the passed map it is // replaced, otherwise the original string is returned. func (t *TaskEnvironment) ReplaceEnv(arg string) string { - return hargs.ReplaceEnv(arg, t.taskEnv, t.nodeValues) + return hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues) } // Build must be called after all the tasks environment values have been set. func (t *TaskEnvironment) Build() *TaskEnvironment { - t.nodeValues = make(map[string]string) - t.taskEnv = make(map[string]string) + t.NodeValues = make(map[string]string) + t.TaskEnv = make(map[string]string) // Build the task metadata - for k, v := range t.meta { - t.taskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v + for k, v := range t.Meta { + t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v } // Build the ports - for _, network := range t.networks { - for label, value := range network.MapLabelToValues(t.portMap) { + for _, network := range t.Networks { + for label, value := range network.MapLabelToValues(t.PortMap) { IPPort := fmt.Sprintf("%s:%d", network.IP, value) - t.taskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort + t.TaskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort // Pass an explicit port mapping to the environment - if port, ok := t.portMap[label]; ok { - t.taskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(port) + if port, ok := t.PortMap[label]; ok { + t.TaskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(port) } } } // Build the directories - if t.allocDir != "" { - t.taskEnv[AllocDir] = t.allocDir + if t.AllocDir != "" { + t.TaskEnv[AllocDir] = t.AllocDir } - if t.taskDir != "" { - t.taskEnv[TaskLocalDir] = t.taskDir + if t.TaskDir != "" { + t.TaskEnv[TaskLocalDir] = t.TaskDir } // Build the resource limits - if t.memLimit != 0 { - t.taskEnv[MemLimit] = strconv.Itoa(t.memLimit) + if t.MemLimit != 0 { + t.TaskEnv[MemLimit] = strconv.Itoa(t.MemLimit) } - if t.cpuLimit != 0 { - t.taskEnv[CpuLimit] = strconv.Itoa(t.cpuLimit) + if t.CpuLimit != 0 { + t.TaskEnv[CpuLimit] = strconv.Itoa(t.CpuLimit) } // Build the node - if t.node != nil { + if t.Node != nil { // Set up the node values. - t.nodeValues[nodeIdKey] = t.node.ID - t.nodeValues[nodeDcKey] = t.node.Datacenter - t.nodeValues[nodeNameKey] = t.node.Name - t.nodeValues[nodeClassKey] = t.node.NodeClass + t.NodeValues[nodeIdKey] = t.Node.ID + t.NodeValues[nodeDcKey] = t.Node.Datacenter + t.NodeValues[nodeNameKey] = t.Node.Name + t.NodeValues[nodeClassKey] = t.Node.NodeClass // Set up the attributes. - for k, v := range t.node.Attributes { - t.nodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v + for k, v := range t.Node.Attributes { + t.NodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v } // Set up the meta. - for k, v := range t.node.Meta { - t.nodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v + for k, v := range t.Node.Meta { + t.NodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v } } // Interpret the environment variables - interpreted := make(map[string]string, len(t.env)) - for k, v := range t.env { - interpreted[k] = hargs.ReplaceEnv(v, t.nodeValues, t.taskEnv) + interpreted := make(map[string]string, len(t.Env)) + for k, v := range t.Env { + interpreted[k] = hargs.ReplaceEnv(v, t.NodeValues, t.TaskEnv) } for k, v := range interpreted { - t.taskEnv[k] = v + t.TaskEnv[k] = v } return t @@ -169,7 +169,7 @@ func (t *TaskEnvironment) Build() *TaskEnvironment { // EnvList returns a list of strings with NAME=value pairs. func (t *TaskEnvironment) EnvList() []string { env := []string{} - for k, v := range t.taskEnv { + for k, v := range t.TaskEnv { env = append(env, fmt.Sprintf("%s=%s", k, v)) } @@ -178,8 +178,8 @@ func (t *TaskEnvironment) EnvList() []string { // EnvMap returns a copy of the tasks environment variables. func (t *TaskEnvironment) EnvMap() map[string]string { - m := make(map[string]string, len(t.taskEnv)) - for k, v := range t.taskEnv { + m := make(map[string]string, len(t.TaskEnv)) + for k, v := range t.TaskEnv { m[k] = v } @@ -188,95 +188,95 @@ func (t *TaskEnvironment) EnvMap() map[string]string { // Builder methods to build the TaskEnvironment func (t *TaskEnvironment) SetAllocDir(dir string) *TaskEnvironment { - t.allocDir = dir + t.AllocDir = dir return t } func (t *TaskEnvironment) ClearAllocDir() *TaskEnvironment { - t.allocDir = "" + t.AllocDir = "" return t } func (t *TaskEnvironment) SetTaskLocalDir(dir string) *TaskEnvironment { - t.taskDir = dir + t.TaskDir = dir return t } func (t *TaskEnvironment) ClearTaskLocalDir() *TaskEnvironment { - t.taskDir = "" + t.TaskDir = "" return t } func (t *TaskEnvironment) SetMemLimit(limit int) *TaskEnvironment { - t.memLimit = limit + t.MemLimit = limit return t } func (t *TaskEnvironment) ClearMemLimit() *TaskEnvironment { - t.memLimit = 0 + t.MemLimit = 0 return t } func (t *TaskEnvironment) SetCpuLimit(limit int) *TaskEnvironment { - t.cpuLimit = limit + t.CpuLimit = limit return t } func (t *TaskEnvironment) ClearCpuLimit() *TaskEnvironment { - t.cpuLimit = 0 + t.CpuLimit = 0 return t } func (t *TaskEnvironment) SetNetworks(networks []*structs.NetworkResource) *TaskEnvironment { - t.networks = networks + t.Networks = networks return t } func (t *TaskEnvironment) clearNetworks() *TaskEnvironment { - t.networks = nil + t.Networks = nil return t } func (t *TaskEnvironment) SetPortMap(portMap map[string]int) *TaskEnvironment { - t.portMap = portMap + t.PortMap = portMap return t } func (t *TaskEnvironment) clearPortMap() *TaskEnvironment { - t.portMap = nil + t.PortMap = nil return t } // Takes a map of meta values to be passed to the task. The keys are capatilized // when the environent variable is set. func (t *TaskEnvironment) SetMeta(m map[string]string) *TaskEnvironment { - t.meta = m + t.Meta = m return t } func (t *TaskEnvironment) ClearMeta() *TaskEnvironment { - t.meta = nil + t.Meta = nil return t } func (t *TaskEnvironment) SetEnvvars(m map[string]string) *TaskEnvironment { - t.env = m + t.Env = m return t } // Appends the given environment variables. func (t *TaskEnvironment) AppendEnvvars(m map[string]string) *TaskEnvironment { - if t.env == nil { - t.env = make(map[string]string, len(m)) + if t.Env == nil { + t.Env = make(map[string]string, len(m)) } for k, v := range m { - t.env[k] = v + t.Env[k] = v } return t } func (t *TaskEnvironment) ClearEnvvars() *TaskEnvironment { - t.env = nil + t.Env = nil return t } diff --git a/client/driver/exec.go b/client/driver/exec.go index ca8059c4f..e80134c3e 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -5,21 +5,20 @@ import ( "fmt" "log" "os/exec" - //"path/filepath" + "path/filepath" "syscall" "time" "github.com/hashicorp/go-plugin" - //"github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" "github.com/hashicorp/nomad/client/driver/plugins" cstructs "github.com/hashicorp/nomad/client/driver/structs" - //"github.com/hashicorp/nomad/client/getter" - "github.com/hashicorp/nomad/nomad/structs" - //"github.com/mitchellh/mapstructure" - + "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/helper/discover" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/mapstructure" ) // ExecDriver fork/execs tasks using as many of the underlying OS's isolation @@ -70,77 +69,57 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) { } func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { - // var driverConfig ExecDriverConfig - // if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { - // return nil, err - // } - // // Get the command to be ran - // command := driverConfig.Command - // if command == "" { - // return nil, fmt.Errorf("missing command for exec driver") - // } - // - // // Create a location to download the artifact. - // taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] - // if !ok { - // return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) - // } - // - // // Check if an artificat is specified and attempt to download it - // source, ok := task.Config["artifact_source"] - // if ok && source != "" { - // // Proceed to download an artifact to be executed. - // _, err := getter.GetArtifact( - // filepath.Join(taskDir, allocdir.TaskLocal), - // driverConfig.ArtifactSource, - // driverConfig.Checksum, - // d.logger, - // ) - // if err != nil { - // return nil, err - // } - // } - // - // // Setup the command - // execCtx := executor.NewExecutorContext(d.taskEnv) - // cmd := executor.Command(execCtx, command, driverConfig.Args...) - // if err := cmd.Limit(task.Resources); err != nil { - // return nil, fmt.Errorf("failed to constrain resources: %s", err) - // } - // - // // Populate environment variables - // cmd.Command().Env = d.taskEnv.EnvList() - // - // if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil { - // return nil, fmt.Errorf("failed to configure task directory: %v", err) - // } - // - // if err := cmd.Start(); err != nil { - // return nil, fmt.Errorf("failed to start command: %v", err) - // } - // + var driverConfig ExecDriverConfig + if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { + return nil, err + } + // Get the command to be ran + command := driverConfig.Command + if command == "" { + return nil, fmt.Errorf("missing command for exec driver") + } + + // Create a location to download the artifact. + taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] + if !ok { + return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) + } + + // Check if an artificat is specified and attempt to download it + source, ok := task.Config["artifact_source"] + if ok && source != "" { + // Proceed to download an artifact to be executed. + _, err := getter.GetArtifact( + filepath.Join(taskDir, allocdir.TaskLocal), + driverConfig.ArtifactSource, + driverConfig.Checksum, + d.logger, + ) + if err != nil { + return nil, err + } + } bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } - executorClient := plugin.NewClient(&plugin.ClientConfig{ + pluginConfig := &plugin.ClientConfig{ HandshakeConfig: plugins.HandshakeConfig, Plugins: plugins.PluginMap, Cmd: exec.Command(bin, "executor"), - }) - - rpcClient, err := executorClient.Client() - if err != nil { - return nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err) } - raw, err := rpcClient.Dispense("executor") + executor, pluginClient, err := d.executor(pluginConfig) if err != nil { - return nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) + return nil, err } - executorPlugin := raw.(plugins.Executor) - ps, err := executorPlugin.LaunchCmd(exec.Command("/bin/echo", "hello"), &plugins.ExecutorContext{}) + executorCtx := &plugins.ExecutorContext{ + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, + } + ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) if err != nil { return nil, fmt.Errorf("error starting process via the plugin: %v", err) } @@ -148,8 +127,8 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // Return a driver handle h := &execHandle{ - pluginClient: executorClient, - executor: executorPlugin, + pluginClient: pluginClient, + executor: executor, //cmd: cmd, killTimeout: d.DriverContext.KillTimeout(task), logger: d.logger, @@ -178,10 +157,16 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro // if err != nil { // return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) // } + + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + pluginConfig := &plugin.ClientConfig{ HandshakeConfig: plugins.HandshakeConfig, Plugins: plugins.PluginMap, - Cmd: exec.Command("/home/diptanuc/Projects/gocode/bin/nomad"), + Cmd: exec.Command(bin, "executor"), Reattach: id.PluginConfig, } executor, client, err := d.executor(pluginConfig) @@ -208,6 +193,7 @@ func (d *ExecDriver) executor(config *plugin.ClientConfig) (plugins.Executor, *p if err != nil { return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err) } + rpcClient.SyncStreams(d.config.LogOutput, d.config.LogOutput) raw, err := rpcClient.Dispense("executor") if err != nil { @@ -248,7 +234,7 @@ func (h *execHandle) Kill() error { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - _, err := h.executor.Exit() + err := h.executor.Exit() return err } } diff --git a/client/driver/plugins/executor.go b/client/driver/plugins/executor.go index a8c918036..10530f6e9 100644 --- a/client/driver/plugins/executor.go +++ b/client/driver/plugins/executor.go @@ -1,11 +1,16 @@ package plugins import ( + "log" "net/rpc" - "os/exec" + "os" "time" "github.com/hashicorp/go-plugin" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/nomad/structs" ) var HandshakeConfig = plugin.HandshakeConfig{ @@ -19,6 +24,16 @@ var PluginMap = map[string]plugin.Plugin{ } type ExecutorContext struct { + TaskEnv *env.TaskEnvironment + AllocDir *allocdir.AllocDir + Task *structs.Task + Chroot bool + Limits bool +} + +type ExecCommand struct { + Cmd string + Args []string } type ProcessState struct { @@ -28,10 +43,10 @@ type ProcessState struct { } type Executor interface { - LaunchCmd(cmd *exec.Cmd, ctx *ExecutorContext) (*ProcessState, error) + LaunchCmd(cmd *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) Wait() (*ProcessState, error) - ShutDown() (*ProcessState, error) - Exit() (*ProcessState, error) + ShutDown() error + Exit() error } type ExecutorRPC struct { @@ -39,11 +54,11 @@ type ExecutorRPC struct { } type LaunchCmdArgs struct { - Cmd *exec.Cmd + Cmd *ExecCommand Ctx *ExecutorContext } -func (e *ExecutorRPC) LaunchCmd(cmd *exec.Cmd, ctx *ExecutorContext) (*ProcessState, error) { +func (e *ExecutorRPC) LaunchCmd(cmd *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) { var ps ProcessState err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps) return &ps, err @@ -55,16 +70,16 @@ func (e *ExecutorRPC) Wait() (*ProcessState, error) { return &ps, err } -func (e *ExecutorRPC) ShutDown() (*ProcessState, error) { +func (e *ExecutorRPC) ShutDown() error { var ps ProcessState err := e.client.Call("Plugin.ShutDown", new(interface{}), &ps) - return &ps, err + return err } -func (e *ExecutorRPC) Exit() (*ProcessState, error) { +func (e *ExecutorRPC) Exit() error { var ps ProcessState err := e.client.Call("Plugin.Exit", new(interface{}), &ps) - return &ps, err + return err } type ExecutorRPCServer struct { @@ -85,22 +100,26 @@ func (e *ExecutorRPCServer) Wait(args interface{}, ps *ProcessState) error { func (e *ExecutorRPCServer) ShutDown(args interface{}, ps *ProcessState) error { var err error - ps, err = e.Impl.ShutDown() + err = e.Impl.ShutDown() return err } func (e *ExecutorRPCServer) Exit(args interface{}, ps *ProcessState) error { var err error - ps, err = e.Impl.Exit() + err = e.Impl.Exit() return err } -type ExecutorPlugin struct{} +type ExecutorPlugin struct { + logger *log.Logger +} func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { - return &ExecutorRPCServer{Impl: NewExecutor()}, nil + p.logger = log.New(os.Stdout, "executor-plugin-server:", log.LstdFlags) + return &ExecutorRPCServer{Impl: NewExecutor(p.logger)}, nil } func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { + p.logger = log.New(os.Stdout, "executor-plugin-client:", log.LstdFlags) return &ExecutorRPC{client: c}, nil } diff --git a/client/driver/plugins/executor_basic.go b/client/driver/plugins/executor_basic.go index a2e59373d..67a4b4fb8 100644 --- a/client/driver/plugins/executor_basic.go +++ b/client/driver/plugins/executor_basic.go @@ -3,30 +3,96 @@ package plugins import ( + "fmt" + "log" + "os" "os/exec" + "path/filepath" + "runtime" + "syscall" "time" + + "github.com/hashicorp/nomad/client/allocdir" ) type BasicExecutor struct { + logger *log.Logger + cmd exec.Cmd } -func NewExecutor() Executor { - return &BasicExecutor{} +func NewExecutor(logger *log.Logger) Executor { + return &BasicExecutor{logger: logger} } -func (e *BasicExecutor) LaunchCmd(cmd *exec.Cmd, ctx *ExecutorContext) (*ProcessState, error) { +func (e *BasicExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) { + e.cmd.Path = command.Cmd + e.cmd.Args = append([]string{command.Cmd}, command.Args...) + e.cmd.Path = ctx.TaskEnv.ReplaceEnv(e.cmd.Path) + e.cmd.Args = ctx.TaskEnv.ParseAndReplace(e.cmd.Args) + + if filepath.Base(command.Cmd) == command.Cmd { + if lp, err := exec.LookPath(command.Cmd); err != nil { + } else { + e.cmd.Path = lp + } + } + e.configureTaskDir(ctx.Task.Name, ctx.AllocDir) + e.cmd.Env = ctx.TaskEnv.EnvList() + stdoPath := filepath.Join(e.cmd.Dir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", ctx.Task.Name)) + stdo, err := os.OpenFile(stdoPath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + e.cmd.Stdout = stdo + + stdePath := filepath.Join(e.cmd.Dir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", ctx.Task.Name)) + stde, err := os.OpenFile(stdePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + e.cmd.Stderr = stde + if err := e.cmd.Start(); err != nil { + return nil, err + } + return &ProcessState{Pid: 5, ExitCode: -1, Time: time.Now()}, nil } func (e *BasicExecutor) Wait() (*ProcessState, error) { - time.Sleep(5 * time.Second) - return &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}, nil + err := e.cmd.Wait() + exitCode := 1 + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCode = status.ExitStatus() + } + } + return &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()}, nil } -func (e *BasicExecutor) Exit() (*ProcessState, error) { - return &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}, nil +func (e *BasicExecutor) Exit() error { + proc, err := os.FindProcess(e.cmd.Process.Pid) + if err != nil { + return fmt.Errorf("failied to find user process %v: %v", e.cmd.Process.Pid, err) + } + return proc.Kill() } -func (e *BasicExecutor) ShutDown() (*ProcessState, error) { - return &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}, nil +func (e *BasicExecutor) ShutDown() error { + proc, err := os.FindProcess(e.cmd.Process.Pid) + if err != nil { + return err + } + if runtime.GOOS == "windows" { + return proc.Kill() + } + return proc.Signal(os.Interrupt) +} + +func (e *BasicExecutor) configureTaskDir(taskName string, allocDir *allocdir.AllocDir) error { + taskDir, ok := allocDir.TaskDirs[taskName] + if !ok { + return fmt.Errorf("Couldn't find task directory for task %v", taskName) + } + e.cmd.Dir = taskDir + return nil } diff --git a/client/driver/plugins/executor_linux.go b/client/driver/plugins/executor_linux.go index 53df9b646..3ad2acc3c 100644 --- a/client/driver/plugins/executor_linux.go +++ b/client/driver/plugins/executor_linux.go @@ -1,23 +1,34 @@ package plugins import ( + "fmt" "log" "os/exec" + "path/filepath" "time" ) type LinuxExecutor struct { - cmd *exec.Cmd ctx *ExecutorContext - log *log.Logger + logger *log.Logger } -func NewExecutor() Executor { - return &LinuxExecutor{} +func NewExecutor(logger *log.Logger) Executor { + return &LinuxExecutor{logger: logger} } -func (e *LinuxExecutor) LaunchCmd(cmd *exec.Cmd, ctx *ExecutorContext) (*ProcessState, error) { +func (e *LinuxExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) { + var cmd exec.Cmd + cmd.Path = command.Cmd + cmd.Args = append([]string{name}, args...) + if filepath.Base(command.Cmd) == command.Cmd { + if lp, err := exec.LookPath(command.Cmd); err != nil { + } else { + cmd.Path = lp + } + } + cmd.Env = ctx.TaskEnv.EnvList() return &ProcessState{Pid: 5, ExitCode: -1, Time: time.Now()}, nil } @@ -26,10 +37,10 @@ func (e *LinuxExecutor) Wait() (*ProcessState, error) { return &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}, nil } -func (e *LinuxExecutor) Exit() (*ProcessState, error) { - return &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}, nil +func (e *LinuxExecutor) Exit() error { + return nil } func (e *LinuxExecutor) ShutDown() (*ProcessState, error) { - return &ProcessState{Pid: 0, ExitCode: 0, Time: time.Now()}, nil + return nil } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 5c870e80f..502d2fc37 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -4,15 +4,18 @@ import ( "encoding/json" "fmt" "log" + "os/exec" "path/filepath" "time" + "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/client/driver/plugins" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" + "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" ) @@ -32,11 +35,12 @@ type RawExecDriver struct { // rawExecHandle is returned from Start/Open as a handle to the PID type rawExecHandle struct { - cmd executor.Executor - killTimeout time.Duration - logger *log.Logger - waitCh chan *cstructs.WaitResult - doneCh chan struct{} + pluginClient *plugin.Client + executor plugins.Executor + killTimeout time.Duration + logger *log.Logger + waitCh chan *cstructs.WaitResult + doneCh chan struct{} } // NewRawExecDriver is used to create a new raw exec driver @@ -90,40 +94,62 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl } } - // Setup the command - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd := executor.NewBasicExecutor(execCtx) - executor.SetCommand(cmd, command, driverConfig.Args) - if err := cmd.Limit(task.Resources); err != nil { - return nil, fmt.Errorf("failed to constrain resources: %s", err) + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + pluginConfig := &plugin.ClientConfig{ + HandshakeConfig: plugins.HandshakeConfig, + Plugins: plugins.PluginMap, + Cmd: exec.Command(bin, "executor"), } - // Populate environment variables - cmd.Command().Env = d.taskEnv.EnvList() - - if err := cmd.ConfigureTaskDir(d.taskName, ctx.AllocDir); err != nil { - return nil, fmt.Errorf("failed to configure task directory: %v", err) + executor, pluginClient, err := d.executor(pluginConfig) + if err != nil { + return nil, err } - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start command: %v", err) + executorCtx := &plugins.ExecutorContext{ + TaskEnv: d.taskEnv, + AllocDir: ctx.AllocDir, + Task: task, } + ps, err := executor.LaunchCmd(&plugins.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx) + if err != nil { + return nil, fmt.Errorf("error starting process via the plugin: %v", err) + } + d.logger.Printf("DIPTANU Started process via plugin: %#v", ps) // Return a driver handle - h := &execHandle{ - cmd: cmd, - killTimeout: d.DriverContext.KillTimeout(task), - logger: d.logger, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + h := &rawExecHandle{ + pluginClient: pluginClient, + executor: executor, + killTimeout: d.DriverContext.KillTimeout(task), + logger: d.logger, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } +func (d *RawExecDriver) executor(config *plugin.ClientConfig) (plugins.Executor, *plugin.Client, error) { + executorClient := plugin.NewClient(config) + rpcClient, err := executorClient.Client() + if err != nil { + return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err) + } + rpcClient.SyncStreams(d.config.LogOutput, d.config.LogOutput) + + raw, err := rpcClient.Dispense("executor") + if err != nil { + return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err) + } + executorPlugin := raw.(plugins.Executor) + return executorPlugin, executorClient, nil +} type rawExecId struct { - ExecutorId string - KillTimeout time.Duration + KillTimeout time.Duration + PluginConfig *plugin.ReattachConfig } func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { @@ -132,30 +158,39 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } - // Find the process - execCtx := executor.NewExecutorContext(d.taskEnv) - cmd := executor.NewBasicExecutor(execCtx) - if err := cmd.Open(id.ExecutorId); err != nil { - return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) + bin, err := discover.NomadExecutable() + if err != nil { + return nil, fmt.Errorf("unable to find the nomad binary: %v", err) + } + + pluginConfig := &plugin.ClientConfig{ + HandshakeConfig: plugins.HandshakeConfig, + Plugins: plugins.PluginMap, + Cmd: exec.Command(bin, "executor"), + Reattach: id.PluginConfig, + } + executor, client, err := d.executor(pluginConfig) + if err != nil { + return nil, fmt.Errorf("error connecting to plugin: %v", err) } // Return a driver handle h := &execHandle{ - cmd: cmd, - logger: d.logger, - killTimeout: id.KillTimeout, - doneCh: make(chan struct{}), - waitCh: make(chan *cstructs.WaitResult, 1), + pluginClient: client, + executor: executor, + logger: d.logger, + killTimeout: id.KillTimeout, + doneCh: make(chan struct{}), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil } func (h *rawExecHandle) ID() string { - executorId, _ := h.cmd.ID() id := rawExecId{ - ExecutorId: executorId, - KillTimeout: h.killTimeout, + KillTimeout: h.killTimeout, + PluginConfig: h.pluginClient.ReattachConfig(), } data, err := json.Marshal(id) @@ -178,18 +213,20 @@ func (h *rawExecHandle) Update(task *structs.Task) error { } func (h *rawExecHandle) Kill() error { - h.cmd.Shutdown() + h.executor.ShutDown() select { case <-h.doneCh: return nil case <-time.After(h.killTimeout): - return h.cmd.ForceStop() + err := h.executor.Exit() + return err } } func (h *rawExecHandle) run() { - res := h.cmd.Wait() + ps, err := h.executor.Wait() close(h.doneCh) - h.waitCh <- res + h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + h.pluginClient.Kill() }