open-nomad/drivers/shared/executor/legacy_executor_wrapper.go
Mahmood Ali ac185b41e2 Legacy executors are executors after all
This fixes a bug where pre-0.9 executors fail to recover after an
upgrade.

The bug is that legacyExecutorWrappers didn't get updated with
ExecStreaming function, and thus failed to implement the Executor
function. Sadly, this meant that all recovery attempts fail, as the
runtime check in
b312aacbc9/drivers/shared/executor/utils.go (L103-L110)
.
2020-11-10 10:20:07 -05:00

222 lines
5.6 KiB
Go

package executor
import (
"encoding/gob"
"fmt"
"net/rpc"
"os"
"syscall"
"time"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"golang.org/x/net/context"
)
const (
// pre09DockerSignal is used in executor.Shutdown to know if it should
// call the ShutDown RPC on the pre09 executor
pre09DockerSignal = "docker"
)
// Registering these types since we have to serialize and de-serialize the Task
// structs over the wire between drivers and the executor.
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]map[string]string{})
gob.Register([]map[string]int{})
gob.Register(syscall.Signal(0x1))
}
type legacyExecutorWrapper struct {
client *pre09ExecutorRPC
logger hclog.Logger
}
// validate that legacyExecutorWrapper is an executor
var _ Executor = (*legacyExecutorWrapper)(nil)
func (l *legacyExecutorWrapper) Launch(launchCmd *ExecCommand) (*ProcessState, error) {
return nil, fmt.Errorf("operation not supported for legacy exec wrapper")
}
func (l *legacyExecutorWrapper) Wait(ctx context.Context) (*ProcessState, error) {
ps, err := l.client.Wait()
if err != nil {
return nil, err
}
return &ProcessState{
Pid: ps.Pid,
ExitCode: ps.ExitCode,
Signal: ps.Signal,
Time: ps.Time,
}, nil
}
func (l *legacyExecutorWrapper) Shutdown(signal string, gracePeriod time.Duration) error {
// The legacy docker driver only used the executor to start a syslog server
// for logging. Thus calling ShutDown for docker will always return an error
// because it never started a process through the executor. If signal is set
// to 'docker' then we'll skip the ShutDown RPC and just call Exit.
//
// This is painful to look at but will only be around a few releases
if signal != pre09DockerSignal {
if err := l.client.ShutDown(); err != nil {
return err
}
}
if err := l.client.Exit(); err != nil {
return err
}
return nil
}
func (l *legacyExecutorWrapper) UpdateResources(*drivers.Resources) error {
return fmt.Errorf("operation not supported for legacy exec wrapper")
}
func (l *legacyExecutorWrapper) Version() (*ExecutorVersion, error) {
v, err := l.client.Version()
if err != nil {
return nil, err
}
return &ExecutorVersion{
Version: v.Version,
}, nil
}
func (l *legacyExecutorWrapper) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
ch := make(chan *cstructs.TaskResourceUsage, 1)
stats, err := l.client.Stats()
if err != nil {
close(ch)
return nil, err
}
select {
case ch <- stats:
default:
}
go l.handleStats(ctx, interval, ch)
return ch, nil
}
func (l *legacyExecutorWrapper) handleStats(ctx context.Context, interval time.Duration, ch chan *cstructs.TaskResourceUsage) {
defer close(ch)
ticker := time.NewTicker(interval)
for range ticker.C {
stats, err := l.client.Stats()
if err != nil {
if err == rpc.ErrShutdown {
return
}
l.logger.Warn("stats collection from legacy executor failed, waiting for next interval", "error", err)
continue
}
if stats != nil {
select {
case ch <- stats:
default:
}
}
}
}
func (l *legacyExecutorWrapper) Signal(s os.Signal) error {
return l.client.Signal(s)
}
func (l *legacyExecutorWrapper) Exec(deadline time.Time, cmd string, args []string) ([]byte, int, error) {
return l.client.Exec(deadline, cmd, args)
}
func (l *legacyExecutorWrapper) ExecStreaming(ctx context.Context, cmd []string, tty bool,
stream drivers.ExecTaskStream) error {
return fmt.Errorf("operation not supported for legacy exec wrapper")
}
type pre09ExecutorRPC struct {
client *rpc.Client
logger hclog.Logger
}
type pre09ExecCmdArgs struct {
Deadline time.Time
Name string
Args []string
}
type pre09ExecCmdReturn struct {
Output []byte
Code int
}
func (e *pre09ExecutorRPC) Wait() (*ProcessState, error) {
var ps ProcessState
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
return &ps, err
}
func (e *pre09ExecutorRPC) ShutDown() error {
return e.client.Call("Plugin.ShutDown", new(interface{}), new(interface{}))
}
func (e *pre09ExecutorRPC) Exit() error {
return e.client.Call("Plugin.Exit", new(interface{}), new(interface{}))
}
func (e *pre09ExecutorRPC) Version() (*ExecutorVersion, error) {
var version ExecutorVersion
err := e.client.Call("Plugin.Version", new(interface{}), &version)
return &version, err
}
func (e *pre09ExecutorRPC) Stats() (*cstructs.TaskResourceUsage, error) {
var resourceUsage cstructs.TaskResourceUsage
err := e.client.Call("Plugin.Stats", new(interface{}), &resourceUsage)
return &resourceUsage, err
}
func (e *pre09ExecutorRPC) Signal(s os.Signal) error {
return e.client.Call("Plugin.Signal", &s, new(interface{}))
}
func (e *pre09ExecutorRPC) Exec(deadline time.Time, name string, args []string) ([]byte, int, error) {
req := pre09ExecCmdArgs{
Deadline: deadline,
Name: name,
Args: args,
}
var resp *pre09ExecCmdReturn
err := e.client.Call("Plugin.Exec", req, &resp)
if resp == nil {
return nil, 0, err
}
return resp.Output, resp.Code, err
}
type pre09ExecutorPlugin struct {
logger hclog.Logger
}
func newPre09ExecutorPlugin(logger hclog.Logger) plugin.Plugin {
return &pre09ExecutorPlugin{logger: logger}
}
func (p *pre09ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &legacyExecutorWrapper{
client: &pre09ExecutorRPC{client: c, logger: p.logger},
logger: p.logger,
}, nil
}
func (p *pre09ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
return nil, fmt.Errorf("client only supported")
}