open-nomad/plugins/executor/executor_plugin.go

186 lines
4.6 KiB
Go
Raw Normal View History

2018-11-29 13:59:53 +00:00
package executor
2016-02-03 19:54:54 +00:00
import (
"encoding/gob"
2016-02-03 19:54:54 +00:00
"net/rpc"
2016-10-10 18:46:27 +00:00
"os"
"syscall"
"time"
2016-02-03 19:54:54 +00:00
hclog "github.com/hashicorp/go-hclog"
2016-02-03 19:54:54 +00:00
"github.com/hashicorp/go-plugin"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor"
2016-02-03 19:54:54 +00:00
)
// Registering these types since we have to serialize and de-serialize the Task
// structs over the wire between drivers and the executor.
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]map[string]string{})
2016-03-22 00:44:37 +00:00
gob.Register([]map[string]int{})
2016-10-10 18:46:27 +00:00
gob.Register(syscall.Signal(0x1))
}
2016-02-03 19:54:54 +00:00
type ExecutorRPC struct {
client *rpc.Client
logger hclog.Logger
2016-02-03 19:54:54 +00:00
}
2016-02-05 00:38:17 +00:00
// LaunchCmdArgs wraps a user command and the args for the purposes of RPC
type LaunchArgs struct {
2016-02-05 00:03:17 +00:00
Cmd *executor.ExecCommand
}
// ShutdownArgs wraps shutdown signal and grace period
type ShutdownArgs struct {
Signal string
GracePeriod time.Duration
}
type ExecArgs struct {
Deadline time.Time
Name string
Args []string
}
type ExecReturn struct {
Output []byte
Code int
}
func (e *ExecutorRPC) Launch(cmd *executor.ExecCommand) (*executor.ProcessState, error) {
2016-02-05 00:03:17 +00:00
var ps *executor.ProcessState
err := e.client.Call("Plugin.Launch", LaunchArgs{Cmd: cmd}, &ps)
return ps, err
2016-02-03 19:54:54 +00:00
}
2016-02-05 00:03:17 +00:00
func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) {
var ps executor.ProcessState
2016-02-03 19:54:54 +00:00
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
return &ps, err
}
func (e *ExecutorRPC) Kill() error {
return e.client.Call("Plugin.Kill", new(interface{}), new(interface{}))
2016-02-03 19:54:54 +00:00
}
func (e *ExecutorRPC) Shutdown(signal string, grace time.Duration) error {
return e.client.Call("Plugin.Shutdown", &ShutdownArgs{signal, grace}, new(interface{}))
2016-10-12 18:35:29 +00:00
}
func (e *ExecutorRPC) UpdateResources(resources *executor.Resources) error {
return e.client.Call("Plugin.UpdateResources", resources, new(interface{}))
}
2016-03-30 05:05:02 +00:00
func (e *ExecutorRPC) Version() (*executor.ExecutorVersion, error) {
var version executor.ExecutorVersion
2016-03-29 23:27:31 +00:00
err := e.client.Call("Plugin.Version", new(interface{}), &version)
2016-03-30 05:05:02 +00:00
return &version, err
2016-03-29 23:27:31 +00:00
}
2016-04-28 23:06:01 +00:00
func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) {
var resourceUsage cstructs.TaskResourceUsage
err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage)
return &resourceUsage, err
}
2016-10-10 18:46:27 +00:00
func (e *ExecutorRPC) Signal(s os.Signal) error {
return e.client.Call("Plugin.Signal", &s, new(interface{}))
}
func (e *ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
req := ExecArgs{
Deadline: deadline,
Name: name,
Args: args,
}
var resp *ExecReturn
err := e.client.Call("Plugin.Exec", req, &resp)
if resp == nil {
return nil, 0, err
}
return resp.Output, resp.Code, err
}
2016-02-03 19:54:54 +00:00
type ExecutorRPCServer struct {
2016-03-30 05:05:02 +00:00
Impl executor.Executor
logger hclog.Logger
2016-02-03 19:54:54 +00:00
}
func (e *ExecutorRPCServer) Launch(args LaunchArgs, ps *executor.ProcessState) error {
state, err := e.Impl.Launch(args.Cmd)
2016-02-04 19:51:43 +00:00
if state != nil {
*ps = *state
}
2016-02-03 19:54:54 +00:00
return err
}
2016-02-05 00:03:17 +00:00
func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error {
2016-02-04 19:51:43 +00:00
state, err := e.Impl.Wait()
if state != nil {
*ps = *state
}
2016-02-03 19:54:54 +00:00
return err
}
func (e *ExecutorRPCServer) Shutdown(args ShutdownArgs, resp *interface{}) error {
return e.Impl.Shutdown(args.Signal, args.GracePeriod)
}
func (e *ExecutorRPCServer) UpdateResources(args *executor.Resources, resp *interface{}) error {
return e.Impl.UpdateResources(args)
}
2016-03-30 05:05:02 +00:00
func (e *ExecutorRPCServer) Version(args interface{}, version *executor.ExecutorVersion) error {
ver, err := e.Impl.Version()
if ver != nil {
*version = *ver
}
return err
2016-03-29 23:27:31 +00:00
}
2016-04-28 23:06:01 +00:00
func (e *ExecutorRPCServer) Stats(args interface{}, resourceUsage *cstructs.TaskResourceUsage) error {
ru, err := e.Impl.Stats()
if ru != nil {
*resourceUsage = *ru
}
return err
}
2016-10-10 18:46:27 +00:00
func (e *ExecutorRPCServer) Signal(args os.Signal, resp *interface{}) error {
return e.Impl.Signal(args)
}
func (e *ExecutorRPCServer) Exec(args ExecArgs, result *ExecReturn) error {
out, code, err := e.Impl.Exec(args.Deadline, args.Name, args.Args)
ret := &ExecReturn{
Output: out,
Code: code,
}
*result = *ret
return err
}
2016-02-03 19:54:54 +00:00
type ExecutorPlugin struct {
logger hclog.Logger
fsIsolation bool
Impl *ExecutorRPCServer
2016-02-03 19:54:54 +00:00
}
func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
if p.Impl == nil {
if p.fsIsolation {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutorWithIsolation(p.logger), logger: p.logger}
} else {
p.Impl = &ExecutorRPCServer{Impl: executor.NewExecutor(p.logger), logger: p.logger}
}
}
return p.Impl, nil
2016-02-03 19:54:54 +00:00
}
func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
2016-03-29 23:27:31 +00:00
return &ExecutorRPC{client: c, logger: p.logger}, nil
2016-02-03 19:54:54 +00:00
}