From 6eba0e46ec467cf1c6083df6e6f4aa31e7a8962e Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 3 Feb 2016 11:54:54 -0800 Subject: [PATCH] Killing the plugin after wait returns --- client/driver/exec.go | 8 +- client/driver/plugins/executor.go | 124 ---------------------- client/driver/plugins/executor_plugin.go | 125 +++++++++++++++++++++++ 3 files changed, 126 insertions(+), 131 deletions(-) create mode 100644 client/driver/plugins/executor_plugin.go diff --git a/client/driver/exec.go b/client/driver/exec.go index e80134c3e..bf696f31a 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -151,13 +151,6 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } - // Find the process - // execCtx := executor.NewExecutorContext(d.taskEnv) - // cmd, err := executor.OpenId(execCtx, id.ExecutorId) - // if err != nil { - // return nil, fmt.Errorf("failed to open ID %v: %v", id.ExecutorId, err) - // } - bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) @@ -244,4 +237,5 @@ func (h *execHandle) run() { close(h.doneCh) h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, Err: err} close(h.waitCh) + h.pluginClient.Kill() } diff --git a/client/driver/plugins/executor.go b/client/driver/plugins/executor.go index 10530f6e9..d5c343e19 100644 --- a/client/driver/plugins/executor.go +++ b/client/driver/plugins/executor.go @@ -1,125 +1 @@ package plugins - -import ( - "log" - "net/rpc" - "os" - "time" - - "github.com/hashicorp/go-plugin" - - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/driver/env" - "github.com/hashicorp/nomad/nomad/structs" -) - -var HandshakeConfig = plugin.HandshakeConfig{ - ProtocolVersion: 1, - MagicCookieKey: "executor_plugin", - MagicCookieValue: "value", -} - -var PluginMap = map[string]plugin.Plugin{ - "executor": new(ExecutorPlugin), -} - -type ExecutorContext struct { - TaskEnv *env.TaskEnvironment - AllocDir *allocdir.AllocDir - Task *structs.Task - Chroot bool - Limits bool -} - -type ExecCommand struct { - Cmd string - Args []string -} - -type ProcessState struct { - Pid int - ExitCode int - Time time.Time -} - -type Executor interface { - LaunchCmd(cmd *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) - Wait() (*ProcessState, error) - ShutDown() error - Exit() error -} - -type ExecutorRPC struct { - client *rpc.Client -} - -type LaunchCmdArgs struct { - Cmd *ExecCommand - Ctx *ExecutorContext -} - -func (e *ExecutorRPC) LaunchCmd(cmd *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) { - var ps ProcessState - err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps) - return &ps, err -} - -func (e *ExecutorRPC) Wait() (*ProcessState, error) { - var ps ProcessState - err := e.client.Call("Plugin.Wait", new(interface{}), &ps) - return &ps, err -} - -func (e *ExecutorRPC) ShutDown() error { - var ps ProcessState - err := e.client.Call("Plugin.ShutDown", new(interface{}), &ps) - return err -} - -func (e *ExecutorRPC) Exit() error { - var ps ProcessState - err := e.client.Call("Plugin.Exit", new(interface{}), &ps) - return err -} - -type ExecutorRPCServer struct { - Impl Executor -} - -func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *ProcessState) error { - var err error - ps, err = e.Impl.LaunchCmd(args.Cmd, args.Ctx) - return err -} - -func (e *ExecutorRPCServer) Wait(args interface{}, ps *ProcessState) error { - var err error - ps, err = e.Impl.Wait() - return err -} - -func (e *ExecutorRPCServer) ShutDown(args interface{}, ps *ProcessState) error { - var err error - err = e.Impl.ShutDown() - return err -} - -func (e *ExecutorRPCServer) Exit(args interface{}, ps *ProcessState) error { - var err error - err = e.Impl.Exit() - return err -} - -type ExecutorPlugin struct { - logger *log.Logger -} - -func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { - p.logger = log.New(os.Stdout, "executor-plugin-server:", log.LstdFlags) - return &ExecutorRPCServer{Impl: NewExecutor(p.logger)}, nil -} - -func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { - p.logger = log.New(os.Stdout, "executor-plugin-client:", log.LstdFlags) - return &ExecutorRPC{client: c}, nil -} diff --git a/client/driver/plugins/executor_plugin.go b/client/driver/plugins/executor_plugin.go new file mode 100644 index 000000000..10530f6e9 --- /dev/null +++ b/client/driver/plugins/executor_plugin.go @@ -0,0 +1,125 @@ +package plugins + +import ( + "log" + "net/rpc" + "os" + "time" + + "github.com/hashicorp/go-plugin" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/driver/env" + "github.com/hashicorp/nomad/nomad/structs" +) + +var HandshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "executor_plugin", + MagicCookieValue: "value", +} + +var PluginMap = map[string]plugin.Plugin{ + "executor": new(ExecutorPlugin), +} + +type ExecutorContext struct { + TaskEnv *env.TaskEnvironment + AllocDir *allocdir.AllocDir + Task *structs.Task + Chroot bool + Limits bool +} + +type ExecCommand struct { + Cmd string + Args []string +} + +type ProcessState struct { + Pid int + ExitCode int + Time time.Time +} + +type Executor interface { + LaunchCmd(cmd *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) + Wait() (*ProcessState, error) + ShutDown() error + Exit() error +} + +type ExecutorRPC struct { + client *rpc.Client +} + +type LaunchCmdArgs struct { + Cmd *ExecCommand + Ctx *ExecutorContext +} + +func (e *ExecutorRPC) LaunchCmd(cmd *ExecCommand, ctx *ExecutorContext) (*ProcessState, error) { + var ps ProcessState + err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps) + return &ps, err +} + +func (e *ExecutorRPC) Wait() (*ProcessState, error) { + var ps ProcessState + err := e.client.Call("Plugin.Wait", new(interface{}), &ps) + return &ps, err +} + +func (e *ExecutorRPC) ShutDown() error { + var ps ProcessState + err := e.client.Call("Plugin.ShutDown", new(interface{}), &ps) + return err +} + +func (e *ExecutorRPC) Exit() error { + var ps ProcessState + err := e.client.Call("Plugin.Exit", new(interface{}), &ps) + return err +} + +type ExecutorRPCServer struct { + Impl Executor +} + +func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *ProcessState) error { + var err error + ps, err = e.Impl.LaunchCmd(args.Cmd, args.Ctx) + return err +} + +func (e *ExecutorRPCServer) Wait(args interface{}, ps *ProcessState) error { + var err error + ps, err = e.Impl.Wait() + return err +} + +func (e *ExecutorRPCServer) ShutDown(args interface{}, ps *ProcessState) error { + var err error + err = e.Impl.ShutDown() + return err +} + +func (e *ExecutorRPCServer) Exit(args interface{}, ps *ProcessState) error { + var err error + err = e.Impl.Exit() + return err +} + +type ExecutorPlugin struct { + logger *log.Logger +} + +func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { + p.logger = log.New(os.Stdout, "executor-plugin-server:", log.LstdFlags) + return &ExecutorRPCServer{Impl: NewExecutor(p.logger)}, nil +} + +func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { + p.logger = log.New(os.Stdout, "executor-plugin-client:", log.LstdFlags) + return &ExecutorRPC{client: c}, nil +}