driver: add pre09 migration logic

This commit is contained in:
Nick Ethier 2019-01-14 12:25:59 -05:00
parent 3104468c89
commit f626b6bf3d
No known key found for this signature in database
GPG Key ID: 07C1A3ECED90D24A
19 changed files with 289 additions and 177 deletions

View File

@ -307,6 +307,11 @@ func (d *AllocDir) UnmountAll() error {
fmt.Errorf("failed to remove the secret dir %q: %v", dir.SecretsDir, err)) fmt.Errorf("failed to remove the secret dir %q: %v", dir.SecretsDir, err))
} }
} }
// Unmount dev/ and proc/ have been mounted.
if err := dir.unmountSpecialDirs(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
} }
return mErr.ErrorOrNil() return mErr.ErrorOrNil()

View File

@ -0,0 +1,36 @@
package allocdir
import (
"fmt"
"os"
"path/filepath"
multierror "github.com/hashicorp/go-multierror"
)
// unmountSpecialDirs unmounts the dev and proc file system from the chroot. No
// error is returned if the directories do not exist or have already been
// unmounted.
func (t *TaskDir) unmountSpecialDirs() error {
errs := new(multierror.Error)
dev := filepath.Join(t.Dir, "dev")
if pathExists(dev) {
if err := unlinkDir(dev); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount dev %q: %v", dev, err))
} else if err := os.RemoveAll(dev); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete dev directory %q: %v", dev, err))
}
}
// Unmount proc.
proc := filepath.Join(t.Dir, "proc")
if pathExists(proc) {
if err := unlinkDir(proc); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to unmount proc %q: %v", proc, err))
} else if err := os.RemoveAll(proc); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to delete proc directory %q: %v", dev, err))
}
}
return errs.ErrorOrNil()
}

View File

@ -0,0 +1,8 @@
// +build !linux
package allocdir
// currently a noop on non-Linux platforms
func (d *TaskDir) unmountSpecialDirs() error {
return nil
}

View File

@ -1095,7 +1095,7 @@ func (tr *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage {
// Look up device statistics lazily when fetched, as currently we do not emit any stats for them yet // Look up device statistics lazily when fetched, as currently we do not emit any stats for them yet
if ru != nil && tr.deviceStatsReporter != nil { if ru != nil && tr.deviceStatsReporter != nil {
deviceResources := tr.Alloc().AllocatedResources.Tasks[tr.taskName].Devices deviceResources := tr.taskResources.Devices
ru.ResourceUsage.DeviceStats = tr.deviceStatsReporter.LatestDeviceResourceStats(deviceResources) ru.ResourceUsage.DeviceStats = tr.deviceStatsReporter.LatestDeviceResourceStats(deviceResources)
} }
return ru return ru

View File

@ -1,9 +1,13 @@
package state package state
import ( import (
"encoding/json"
"fmt"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared"
) )
// allocRunnerMutableState08 is state that had to be written on each save as it // allocRunnerMutableState08 is state that had to be written on each save as it
@ -40,7 +44,23 @@ type taskRunnerState08 struct {
//CreatedResources *driver.CreatedResources //CreatedResources *driver.CreatedResources
} }
func (t *taskRunnerState08) Upgrade() *state.LocalState { type taskRunnerHandle08 struct {
PluginConfig struct {
Pid int `json:"Pid"`
AddrNet string `json:"AddrNet"`
AddrName string `json:"AddrName"`
} `json:"PluginConfig"`
}
func (t *taskRunnerHandle08) reattachConfig() *shared.ReattachConfig {
return &shared.ReattachConfig{
Network: t.PluginConfig.AddrNet,
Addr: t.PluginConfig.AddrName,
Pid: t.PluginConfig.Pid,
}
}
func (t *taskRunnerState08) Upgrade(allocID, taskName string) *state.LocalState {
ls := state.NewLocalState() ls := state.NewLocalState()
// Reuse DriverNetwork // Reuse DriverNetwork
@ -56,22 +76,40 @@ func (t *taskRunnerState08) Upgrade() *state.LocalState {
PrestartDone: t.TaskDirBuilt, PrestartDone: t.TaskDirBuilt,
} }
// Don't need logmon in pre09 tasks
ls.Hooks["logmon"] = &state.HookState{
PrestartDone: true,
}
// Upgrade dispatch payload state // Upgrade dispatch payload state
ls.Hooks["dispatch_payload"] = &state.HookState{ ls.Hooks["dispatch_payload"] = &state.HookState{
PrestartDone: t.PayloadRendered, PrestartDone: t.PayloadRendered,
} }
//TODO How to convert handles?! This does not work. //TODO How to convert handles?! This does not work.
ls.TaskHandle = drivers.NewTaskHandle("TODO") ls.TaskHandle = drivers.NewTaskHandle(0)
//TODO where do we get this from? //TODO where do we get this from?
ls.TaskHandle.Config = nil ls.TaskHandle.Config = &drivers.TaskConfig{
Name: taskName,
AllocID: allocID,
}
//TODO do we need to se this accurately? Or will RecoverTask handle it? //TODO do we need to se this accurately? Or will RecoverTask handle it?
ls.TaskHandle.State = drivers.TaskStateUnknown ls.TaskHandle.State = drivers.TaskStateUnknown
//TODO do we need an envelope so drivers know this is an old state? // A ReattachConfig to the pre09 executor is sent
ls.TaskHandle.SetDriverState(t.HandleID) var raw []byte
var handle taskRunnerHandle08
if err := json.Unmarshal([]byte(t.HandleID), &handle); err != nil {
fmt.Println("ERR: ", err)
}
raw, err := json.Marshal(handle.reattachConfig())
if err != nil {
fmt.Println("ERR: ", err)
}
ls.TaskHandle.DriverState = raw
return ls return ls
} }

View File

@ -211,7 +211,7 @@ func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, al
} }
// Convert 0.8 task state to 0.9 task state // Convert 0.8 task state to 0.9 task state
localTaskState := oldState.Upgrade() localTaskState := oldState.Upgrade(allocID, taskName)
// Insert the new task state // Insert the new task state
if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil { if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil {

View File

@ -2,6 +2,7 @@ package exec
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@ -28,6 +29,10 @@ const (
// fingerprintPeriod is the interval at which the driver will send fingerprint responses // fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second fingerprintPeriod = 30 * time.Second
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
) )
var ( var (
@ -218,6 +223,21 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("handle cannot be nil") return fmt.Errorf("handle cannot be nil")
} }
// pre 0.9 upgrade path check
if handle.Version == 0 {
var reattach shared.ReattachConfig
d.logger.Debug("parsing pre09 driver state", "state", string(handle.DriverState))
if err := json.Unmarshal(handle.DriverState, &reattach); err != nil {
return err
}
reattachConfig, err := shared.ReattachConfigToGoPlugin(&reattach)
if err != nil {
return err
}
return d.recoverPre0_9Task(handle.Config, reattachConfig)
}
// If already attached to handle there's nothing to recover. // If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok { if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Trace("nothing to recover; task already exists", d.logger.Trace("nothing to recover; task already exists",
@ -241,7 +261,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to build ReattachConfig from task state: %v", err) return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
} }
exec, pluginClient, err := executor.CreateExecutorWithConfig(plugRC, exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
if err != nil { if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
@ -275,7 +295,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
} }
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(pluginName) handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")

View File

@ -0,0 +1,36 @@
package exec
import (
"fmt"
"time"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/plugins/drivers"
)
func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_name", config.Name)
return fmt.Errorf("failed to reattach to executor: %v", err)
}
h := &taskHandle{
exec: exec,
pid: reattach.Pid,
pluginClient: pluginClient,
taskConfig: config,
procState: drivers.TaskStateRunning,
startedAt: time.Now(),
exitResult: &drivers.ExitResult{},
}
d.tasks.Set(config.ID, h)
go h.run()
return nil
}

View File

@ -263,7 +263,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
} }
execImpl, pluginClient, err := executor.CreateExecutorWithConfig(plugRC, execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
if err != nil { if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
@ -309,7 +309,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
d.logger.Info("starting java task", "driver_cfg", hclog.Fmt("%+v", driverConfig), "args", args) d.logger.Info("starting java task", "driver_cfg", hclog.Fmt("%+v", driverConfig), "args", args)
handle := drivers.NewTaskHandle(pluginName) handle := drivers.NewTaskHandle(1)
handle.Config = cfg handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")

View File

@ -419,7 +419,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
driverState := MockTaskState{ driverState := MockTaskState{
StartedAt: h.startedAt, StartedAt: h.startedAt,
} }
handle := drivers.NewTaskHandle(pluginName) handle := drivers.NewTaskHandle(1)
handle.Config = cfg handle.Config = cfg
if err := handle.SetDriverState(&driverState); err != nil { if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err, "task_name", cfg.Name) d.logger.Error("failed to start task, error setting driver state", "error", err, "task_name", cfg.Name)

View File

@ -261,7 +261,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
} }
execImpl, pluginClient, err := executor.CreateExecutorWithConfig(plugRC, execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
if err != nil { if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
@ -295,7 +295,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
} }
handle := drivers.NewTaskHandle(pluginName) handle := drivers.NewTaskHandle(1)
handle.Config = cfg handle.Config = cfg
// Get the image source // Get the image source

View File

@ -269,7 +269,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
} }
// Create client for reattached executor // Create client for reattached executor
exec, pluginClient, err := executor.CreateExecutorWithConfig(plugRC, exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
if err != nil { if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
@ -303,7 +303,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
} }
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(pluginName) handle := drivers.NewTaskHandle(1)
handle.Config = cfg handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")

View File

@ -365,7 +365,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
} }
execImpl, pluginClient, err := executor.CreateExecutorWithConfig(plugRC, execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
if err != nil { if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
@ -408,7 +408,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
} }
handle := drivers.NewTaskHandle(pluginName) handle := drivers.NewTaskHandle(1)
handle.Config = cfg handle.Config = cfg
// todo(preetha) - port map in client v1 is a slice of maps that get merged. figure out if the caller will do this // todo(preetha) - port map in client v1 is a slice of maps that get merged. figure out if the caller will do this

View File

@ -1,110 +0,0 @@
package legacy
import (
"encoding/gob"
"net/rpc"
"os"
"syscall"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
cstructs "github.com/hashicorp/nomad/client/structs"
)
// Registering these types since we have to serialize and de-serialize the Task
// structs over the wire between drivers and the executor.
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]map[string]string{})
gob.Register([]map[string]int{})
gob.Register(syscall.Signal(0x1))
}
type ExecutorRPC struct {
client *rpc.Client
logger hclog.Logger
}
type ProcessState struct {
Pid int
ExitCode int
Signal int
Time time.Time
}
type ExecutorVersion struct {
Version string
}
type ExecCmdArgs struct {
Deadline time.Time
Name string
Args []string
}
type ExecCmdReturn struct {
Output []byte
Code int
}
func (e *ExecutorRPC) Wait() (*ProcessState, error) {
var ps ProcessState
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
return &ps, err
}
func (e *ExecutorRPC) ShutDown() error {
return e.client.Call("Plugin.ShutDown", new(interface{}), new(interface{}))
}
func (e *ExecutorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
func (e *ExecutorRPC) Version() (*ExecutorVersion, error) {
var version ExecutorVersion
err := e.client.Call("Plugin.Version", new(interface{}), &version)
return &version, err
}
func (e *ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) {
var resourceUsage cstructs.TaskResourceUsage
err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage)
return &resourceUsage, err
}
func (e *ExecutorRPC) Signal(s os.Signal) error {
return e.client.Call("Plugin.Signal", &s, new(interface{}))
}
func (e *ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
req := ExecCmdArgs{
Deadline: deadline,
Name: name,
Args: args,
}
var resp *ExecCmdReturn
err := e.client.Call("Plugin.Exec", req, &resp)
if resp == nil {
return nil, 0, err
}
return resp.Output, resp.Code, err
}
type ExecutorPlugin struct {
logger hclog.Logger
}
func NewExecutorPlugin(logger hclog.Logger) plugin.Plugin {
return &ExecutorPlugin{logger: logger}
}
func (p *ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &ExecutorRPC{client: c, logger: p.logger}, nil
}
func (p *ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
panic("client only supported")
}

View File

@ -1,18 +1,31 @@
package executor package executor
import ( import (
"encoding/gob"
"net/rpc"
"os" "os"
"syscall"
"time" "time"
hclog "github.com/hashicorp/go-hclog" hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/executor/legacy"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
// Registering these types since we have to serialize and de-serialize the Task
// structs over the wire between drivers and the executor.
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]map[string]string{})
gob.Register([]map[string]int{})
gob.Register(syscall.Signal(0x1))
}
type legacyExecutorWrapper struct { type legacyExecutorWrapper struct {
client *legacy.ExecutorRPC client *pre09ExecutorRPC
logger hclog.Logger logger hclog.Logger
} }
@ -81,6 +94,9 @@ func (l *legacyExecutorWrapper) handleStats(ctx context.Context, interval time.D
for range ticker.C { for range ticker.C {
stats, err := l.client.Stats() stats, err := l.client.Stats()
if err != nil { if err != nil {
if err == rpc.ErrShutdown {
return
}
l.logger.Warn("stats collection from legacy executor failed, waiting for next interval", "error", err) l.logger.Warn("stats collection from legacy executor failed, waiting for next interval", "error", err)
continue continue
} }
@ -101,3 +117,82 @@ func (l *legacyExecutorWrapper) Signal(s os.Signal) error {
func (l *legacyExecutorWrapper) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) { func (l *legacyExecutorWrapper) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) {
return l.client.Exec(deadline, cmd, args) return l.client.Exec(deadline, cmd, args)
} }
type pre09ExecutorRPC struct {
client *rpc.Client
logger hclog.Logger
}
type pre09ExecCmdArgs struct {
Deadline time.Time
Name string
Args []string
}
type pre09ExecCmdReturn struct {
Output []byte
Code int
}
func (e *pre09ExecutorRPC) Wait() (*ProcessState, error) {
var ps ProcessState
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
return &ps, err
}
func (e *pre09ExecutorRPC) ShutDown() error {
return e.client.Call("Plugin.ShutDown", new(interface{}), new(interface{}))
}
func (e *pre09ExecutorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
func (e *pre09ExecutorRPC) Version() (*ExecutorVersion, error) {
var version ExecutorVersion
err := e.client.Call("Plugin.Version", new(interface{}), &version)
return &version, err
}
func (e *pre09ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) {
var resourceUsage cstructs.TaskResourceUsage
err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage)
return &resourceUsage, err
}
func (e *pre09ExecutorRPC) Signal(s os.Signal) error {
return e.client.Call("Plugin.Signal", &s, new(interface{}))
}
func (e *pre09ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
req := pre09ExecCmdArgs{
Deadline: deadline,
Name: name,
Args: args,
}
var resp *pre09ExecCmdReturn
err := e.client.Call("Plugin.Exec", req, &resp)
if resp == nil {
return nil, 0, err
}
return resp.Output, resp.Code, err
}
type pre09ExecutorPlugin struct {
logger hclog.Logger
}
func newPre09ExecutorPlugin(logger hclog.Logger) plugin.Plugin {
return &pre09ExecutorPlugin{logger: logger}
}
func (p *pre09ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &legacyExecutorWrapper{
client: &pre09ExecutorRPC{client: c, logger: p.logger},
logger: p.logger,
}, nil
}
func (p *pre09ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
panic("client only supported")
}

View File

@ -5,7 +5,6 @@ import (
hclog "github.com/hashicorp/go-hclog" hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin" plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor/legacy"
) )
// ExecutorConfig is the config that Nomad passes to the executor // ExecutorConfig is the config that Nomad passes to the executor
@ -31,17 +30,9 @@ func GetPluginMap(logger hclog.Logger, fsIsolation bool) map[string]plugin.Plugi
} }
} }
func GetVersionedPluginMap(logger hclog.Logger, fsIsolation bool) map[int]map[string]plugin.Plugin { func GetPre09PluginMap(logger hclog.Logger, fsIsolation bool) map[string]plugin.Plugin {
return map[int]map[string]plugin.Plugin{ return map[string]plugin.Plugin{
1: { "executor": newPre09ExecutorPlugin(logger),
"executor": legacy.NewExecutorPlugin(logger),
},
2: {
"executor": ExecutorPlugin{
logger: logger,
fsIsolation: fsIsolation,
},
},
} }
} }

View File

@ -64,37 +64,36 @@ func CreateExecutor(logger hclog.Logger, driverConfig *base.ClientDriverConfig,
isolateCommand(config.Cmd) isolateCommand(config.Cmd)
} }
executorClient := plugin.NewClient(config) return newExecutorClient(config, logger)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(Executor)
return executorPlugin, executorClient, nil
} }
// CreateExecutorWithConfig launches a plugin with a given plugin config // ReattachToExecutor launches a plugin with a given plugin config
func CreateExecutorWithConfig(reattachConfig *plugin.ReattachConfig, logger hclog.Logger) (Executor, *plugin.Client, error) { func ReattachToExecutor(reattachConfig *plugin.ReattachConfig, logger hclog.Logger) (Executor, *plugin.Client, error) {
p := &ExecutorPlugin{
logger: logger,
}
config := &plugin.ClientConfig{ config := &plugin.ClientConfig{
HandshakeConfig: base.Handshake, HandshakeConfig: base.Handshake,
Reattach: reattachConfig, Reattach: reattachConfig,
Plugins: map[string]plugin.Plugin{"executor": p}, Plugins: GetPluginMap(logger, false),
// TODO: Use versioned plugin map to support backwards compatibility with
// existing pre-0.9 executors
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Logger: logger.Named("executor"), Logger: logger.Named("executor"),
} }
return newExecutorClient(config, logger)
}
// ReattachToPre09Executor creates a plugin client that reattaches to an existing
// pre 0.9 Nomad executor
func ReattachToPre09Executor(reattachConfig *plugin.ReattachConfig, logger hclog.Logger) (Executor, *plugin.Client, error) {
config := &plugin.ClientConfig{
HandshakeConfig: base.Handshake,
Reattach: reattachConfig,
Plugins: GetPre09PluginMap(logger, false),
AllowedProtocols: []plugin.Protocol{plugin.ProtocolNetRPC},
Logger: logger.Named("executor"),
}
return newExecutorClient(config, logger)
}
func newExecutorClient(config *plugin.ClientConfig, logger hclog.Logger) (Executor, *plugin.Client, error) {
executorClient := plugin.NewClient(config) executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client() rpcClient, err := executorClient.Client()
if err != nil { if err != nil {

View File

@ -1,20 +1,16 @@
package catalog package catalog
import ( import (
"github.com/hashicorp/nomad/drivers/docker"
"github.com/hashicorp/nomad/drivers/exec" "github.com/hashicorp/nomad/drivers/exec"
"github.com/hashicorp/nomad/drivers/java"
"github.com/hashicorp/nomad/drivers/qemu"
"github.com/hashicorp/nomad/drivers/rawexec"
) )
// This file is where all builtin plugins should be registered in the catalog. // This file is where all builtin plugins should be registered in the catalog.
// Plugins with build restrictions should be placed in the appropriate // Plugins with build restrictions should be placed in the appropriate
// register_XXX.go file. // register_XXX.go file.
func init() { func init() {
RegisterDeferredConfig(rawexec.PluginID, rawexec.PluginConfig, rawexec.PluginLoader) // RegisterDeferredConfig(rawexec.PluginID, rawexec.PluginConfig, rawexec.PluginLoader)
Register(exec.PluginID, exec.PluginConfig) Register(exec.PluginID, exec.PluginConfig)
Register(qemu.PluginID, qemu.PluginConfig) // Register(qemu.PluginID, qemu.PluginConfig)
Register(java.PluginID, java.PluginConfig) // Register(java.PluginID, java.PluginConfig)
RegisterDeferredConfig(docker.PluginID, docker.PluginConfig, docker.PluginLoader) // RegisterDeferredConfig(docker.PluginID, docker.PluginConfig, docker.PluginLoader)
} }

View File

@ -1,10 +1,8 @@
package catalog package catalog
import "github.com/hashicorp/nomad/drivers/rkt"
// This file is where all builtin plugins should be registered in the catalog. // This file is where all builtin plugins should be registered in the catalog.
// Plugins with build restrictions should be placed in the appropriate // Plugins with build restrictions should be placed in the appropriate
// register_XXX.go file. // register_XXX.go file.
func init() { func init() {
RegisterDeferredConfig(rkt.PluginID, rkt.PluginConfig, rkt.PluginLoader) // RegisterDeferredConfig(rkt.PluginID, rkt.PluginConfig, rkt.PluginLoader)
} }