open-nomad/drivers/java/driver.go
Alex Dadgar 693f244cce Plugin client's handle plugin dying
This PR plumbs the plugins done ctx through the base and driver plugin
clients (device already had it). Further, it adds generic handling of
gRPC stream errors.
2018-11-12 17:09:27 -08:00

557 lines
15 KiB
Go

package java
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"syscall"
"time"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/executor"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
)
const (
// pluginName is the name of the plugin
pluginName = "java"
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
// The key populated in Node Attributes to indicate presence of the Java driver
driverAttr = "driver.java"
driverVersionAttr = "driver.java.version"
)
var (
// PluginID is the java plugin metadata registered in the plugin
// catalog.
PluginID = loader.PluginID{
Name: pluginName,
PluginType: base.PluginTypeDriver,
}
// PluginConfig is the java driver factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewDriver(l) },
}
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersion: "0.0.1",
PluginVersion: "0.1.0",
Name: pluginName,
}
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{})
// taskConfigSpec is the hcl specification for the driver config section of
// a taskConfig within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
// It's required for either `class` or `jar_path` to be set,
// but that's not expressable in hclspec. Marking both as optional
// and setting checking explicitly later
"class": hclspec.NewAttr("class", "string", false),
"classpath": hclspec.NewAttr("classpath", "string", false),
"jar_path": hclspec.NewAttr("jar_path", "string", false),
"java_opts": hclspec.NewAttr("java_opts", "list(string)", false),
"args": hclspec.NewAttr("args", "list(string)", false),
})
// capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
capabilities = &drivers.Capabilities{
SendSignals: false,
Exec: false,
FSIsolation: cstructs.FSIsolationNone,
}
_ drivers.DriverPlugin = (*Driver)(nil)
)
func init() {
if runtime.GOOS == "linux" {
capabilities.FSIsolation = cstructs.FSIsolationChroot
}
}
// TaskConfig is the driver configuration of a taskConfig within a job
type TaskConfig struct {
Class string `codec:"class"`
ClassPath string `codec:"classpath"`
JarPath string `codec:"jar_path"`
JvmOpts []string `codec:"java_opts"`
Args []string `codec:"args"` // extra arguments to java executable
}
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the taskConfig state and handler
// during recovery.
type TaskState struct {
ReattachConfig *shared.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
}
// Driver is a driver for running images via Java
type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// tasks is the in memory datastore mapping taskIDs to taskHandle
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// nomadConf is the client agent's configuration
nomadConfig *base.ClientDriverConfig
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
}
func NewDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
}
}
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
func (d *Driver) SetConfig(_ []byte, cfg *base.ClientAgentConfig) error {
if cfg != nil {
d.nomadConfig = cfg.Driver
}
return nil
}
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
}
func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
fp := &drivers.Fingerprint{
Attributes: map[string]string{},
Health: drivers.HealthStateHealthy,
HealthDescription: "healthy",
}
if runtime.GOOS == "linux" {
// Only enable if w are root and cgroups are mounted when running on linux system
if syscall.Geteuid() != 0 {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = "run as root"
return fp
}
mount, err := fingerprint.FindCgroupMountpointDir()
if err != nil {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = "failed to discover cgroup mount point"
d.logger.Warn(fp.HealthDescription, "error", err)
return fp
}
if mount == "" {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = "cgroups are unavailable"
return fp
}
}
version, runtime, vm, err := javaVersionInfo()
if err != nil {
// return no error, as it isn't an error to not find java, it just means we
// can't use it.
fp.Health = drivers.HealthStateUndetected
fp.HealthDescription = ""
return fp
}
fp.Attributes[driverAttr] = "1"
fp.Attributes[driverVersionAttr] = version
fp.Attributes["driver.java.runtime"] = runtime
fp.Attributes["driver.java.vm"] = vm
return fp
}
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("handle cannot be nil")
}
// If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Debug("nothing to recover; task already exists",
"task_id", handle.Config.ID,
"task_name", handle.Config.Name,
)
return nil
}
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to decode taskConfig state from handle: %v", err)
}
plugRC, err := shared.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
d.logger.Error("failed to build ReattachConfig from taskConfig state", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
}
pluginConfig := &plugin.ClientConfig{
Reattach: plugRC,
}
execImpl, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
}
h := &taskHandle{
exec: execImpl,
pid: taskState.Pid,
pluginClient: pluginClient,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
}
d.tasks.Set(taskState.TaskConfig.ID, h)
go h.run()
return nil
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
if driverConfig.Class == "" && driverConfig.JarPath == "" {
return nil, nil, fmt.Errorf("jar_path or class must be specified")
}
absPath, err := GetAbsolutePath("java")
if err != nil {
return nil, nil, fmt.Errorf("failed to find java binary: %s", err)
}
args := javaCmdArgs(driverConfig)
d.logger.Info("starting java task", "driver_cfg", hclog.Fmt("%+v", driverConfig), "args", args)
handle := drivers.NewTaskHandle(pluginName)
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &dstructs.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
}
exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
execCmd := &executor.ExecCommand{
Cmd: absPath,
Args: args,
Env: cfg.EnvList(),
User: cfg.User,
ResourceLimits: true,
Resources: &executor.Resources{
CPU: cfg.Resources.NomadResources.CPU,
MemoryMB: cfg.Resources.NomadResources.MemoryMB,
DiskMB: cfg.Resources.NomadResources.DiskMB,
},
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
}
ps, err := exec.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
}
h := &taskHandle{
exec: exec,
pid: ps.Pid,
pluginClient: pluginClient,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
}
driverState := TaskState{
ReattachConfig: shared.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
exec.Shutdown("", 0)
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}
d.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil, nil
}
func javaCmdArgs(driverConfig TaskConfig) []string {
args := []string{}
// Look for jvm options
if len(driverConfig.JvmOpts) != 0 {
args = append(args, driverConfig.JvmOpts...)
}
// Add the classpath
if driverConfig.ClassPath != "" {
args = append(args, "-cp", driverConfig.ClassPath)
}
// Add the jar
if driverConfig.JarPath != "" {
args = append(args, "-jar", driverConfig.JarPath)
}
// Add the class
if driverConfig.Class != "" {
args = append(args, driverConfig.Class)
}
// Add any args
if len(driverConfig.Args) != 0 {
args = append(args, driverConfig.Args...)
}
return args
}
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
ch := make(chan *drivers.ExitResult)
go d.handleWait(ctx, handle, ch)
return ch, nil
}
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait()
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
}
} else {
result = &drivers.ExitResult{
ExitCode: ps.ExitCode,
Signal: ps.Signal,
}
}
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case ch <- result:
}
}
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}
return nil
}
func (d *Driver) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
return fmt.Errorf("cannot destroy running task")
}
if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
}
handle.pluginClient.Kill()
}
d.tasks.Delete(taskID)
return nil
}
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.TaskStatus(), nil
}
func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.exec.Stats()
}
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
}
func (d *Driver) SignalTask(taskID string, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
d.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal, "task_id", handle.taskConfig.ID)
sig = s
}
return handle.exec.Signal(sig)
}
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
if len(cmd) == 0 {
return nil, fmt.Errorf("error cmd must have at least one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], cmd[1:])
if err != nil {
return nil, err
}
return &drivers.ExecTaskResult{
Stdout: out,
ExitResult: &drivers.ExitResult{
ExitCode: exitCode,
},
}, nil
}
// GetAbsolutePath returns the absolute path of the passed binary by resolving
// it in the path and following symlinks.
func GetAbsolutePath(bin string) (string, error) {
lp, err := exec.LookPath(bin)
if err != nil {
return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err)
}
return filepath.EvalSymlinks(lp)
}