Introduced a method in executor to launch syslog server
This commit is contained in:
parent
4bc865e47f
commit
3c7b83b393
|
@ -20,7 +20,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver/logging"
|
||||
"github.com/hashicorp/nomad/client/driver/executor"
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/helper/discover"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -101,7 +101,7 @@ type dockerPID struct {
|
|||
|
||||
type DockerHandle struct {
|
||||
pluginClient *plugin.Client
|
||||
logCollector logging.LogCollector
|
||||
executor executor.Executor
|
||||
client *docker.Client
|
||||
logger *log.Logger
|
||||
cleanupContainer bool
|
||||
|
@ -533,23 +533,23 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
|
||||
}
|
||||
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name))
|
||||
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
|
||||
pluginConfig := &plugin.ClientConfig{
|
||||
Cmd: exec.Command(bin, "syslog", pluginLogFile),
|
||||
Cmd: exec.Command(bin, "executor", pluginLogFile),
|
||||
}
|
||||
|
||||
logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
|
||||
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logCollectorCtx := &logging.LogCollectorContext{
|
||||
TaskName: task.Name,
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
Task: task,
|
||||
AllocDir: ctx.AllocDir,
|
||||
LogConfig: task.LogConfig,
|
||||
PortLowerBound: d.config.ClientMinPort,
|
||||
PortUpperBound: d.config.ClientMaxPort,
|
||||
}
|
||||
ss, err := logCollector.LaunchCollector(logCollectorCtx)
|
||||
ss, err := exec.LaunchSyslogServer(executorCtx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
|
||||
}
|
||||
|
@ -629,7 +629,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
|||
maxKill := d.DriverContext.config.MaxKillTimeout
|
||||
h := &DockerHandle{
|
||||
client: client,
|
||||
logCollector: logCollector,
|
||||
executor: exec,
|
||||
pluginClient: pluginClient,
|
||||
cleanupContainer: cleanupContainer,
|
||||
cleanupImage: cleanupImage,
|
||||
|
@ -686,7 +686,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
|||
if !found {
|
||||
return nil, fmt.Errorf("Failed to find container %s: %v", pid.ContainerID, err)
|
||||
}
|
||||
logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
|
||||
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
|
||||
if err != nil {
|
||||
d.logger.Printf("[INFO] driver.docker: couldn't re-attach to the plugin process: %v", err)
|
||||
if e := client.StopContainer(pid.ContainerID, uint(pid.KillTimeout*time.Second)); e != nil {
|
||||
|
@ -698,7 +698,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
|
|||
// Return a driver handle
|
||||
h := &DockerHandle{
|
||||
client: client,
|
||||
logCollector: logCollector,
|
||||
executor: exec,
|
||||
pluginClient: pluginClient,
|
||||
cleanupContainer: cleanupContainer,
|
||||
cleanupImage: cleanupImage,
|
||||
|
@ -743,7 +743,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
func (h *DockerHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
|
||||
if err := h.executor.UpdateTask(task); err != nil {
|
||||
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
|
||||
}
|
||||
|
||||
|
@ -824,7 +824,7 @@ func (h *DockerHandle) run() {
|
|||
close(h.waitCh)
|
||||
|
||||
// Shutdown the syslog collector
|
||||
if err := h.logCollector.Exit(); err != nil {
|
||||
if err := h.executor.Exit(); err != nil {
|
||||
h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err)
|
||||
}
|
||||
h.pluginClient.Kill()
|
||||
|
|
|
@ -145,7 +145,7 @@ func TestDockerDriver_Handle(t *testing.T) {
|
|||
pluginConfig := &plugin.ClientConfig{
|
||||
Cmd: exec.Command(bin, "syslog", f.Name()),
|
||||
}
|
||||
logCollector, pluginClient, err := createLogCollector(pluginConfig, os.Stdout, &config.Config{})
|
||||
exec, pluginClient, err := createExecutor(pluginConfig, os.Stdout, &config.Config{})
|
||||
if err != nil {
|
||||
t.Fatalf("got an err: %v", err)
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ func TestDockerDriver_Handle(t *testing.T) {
|
|||
h := &DockerHandle{
|
||||
version: "version",
|
||||
imageID: "imageid",
|
||||
logCollector: logCollector,
|
||||
executor: exec,
|
||||
pluginClient: pluginClient,
|
||||
containerID: "containerid",
|
||||
killTimeout: 5 * time.Nanosecond,
|
||||
|
|
|
@ -100,16 +100,17 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
return nil, err
|
||||
}
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
TaskName: task.Name,
|
||||
TaskResources: task.Resources,
|
||||
LogConfig: task.LogConfig,
|
||||
ResourceLimits: true,
|
||||
FSIsolation: true,
|
||||
UnprivilegedUser: true,
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
Task: task,
|
||||
}
|
||||
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx)
|
||||
ps, err := exec.LaunchCmd(&executor.ExecCommand{
|
||||
Cmd: command,
|
||||
Args: driverConfig.Args,
|
||||
FSIsolation: true,
|
||||
ResourceLimits: true,
|
||||
User: cstructs.DefaultUnpriviledgedUser,
|
||||
}, executorCtx)
|
||||
if err != nil {
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
|
||||
|
@ -217,7 +218,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
func (h *execHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
h.executor.UpdateTask(task)
|
||||
|
||||
// Update is not possible
|
||||
return nil
|
||||
|
|
|
@ -2,7 +2,9 @@ package executor
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
|
@ -21,6 +23,18 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// Executor is the interface which allows a driver to launch and supervise
|
||||
// a process
|
||||
type Executor interface {
|
||||
LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error)
|
||||
LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error)
|
||||
Wait() (*ProcessState, error)
|
||||
ShutDown() error
|
||||
Exit() error
|
||||
UpdateLogConfig(logConfig *structs.LogConfig) error
|
||||
UpdateTask(task *structs.Task) error
|
||||
}
|
||||
|
||||
// ExecutorContext holds context to configure the command user
|
||||
// wants to run and isolate it
|
||||
type ExecutorContext struct {
|
||||
|
@ -31,33 +45,36 @@ type ExecutorContext struct {
|
|||
// the task
|
||||
AllocDir *allocdir.AllocDir
|
||||
|
||||
// TaskName is the name of the Task
|
||||
TaskName string
|
||||
// Task is the task whose executor is being launched
|
||||
Task *structs.Task
|
||||
|
||||
// TaskResources are the resource constraints for the Task
|
||||
TaskResources *structs.Resources
|
||||
// PortUpperBound is the upper bound of the ports that we can use to start
|
||||
// the syslog server
|
||||
PortUpperBound uint
|
||||
|
||||
// FSIsolation is a flag for drivers to impose file system
|
||||
// isolation on certain platforms
|
||||
FSIsolation bool
|
||||
|
||||
// ResourceLimits is a flag for drivers to impose resource
|
||||
// contraints on a Task on certain platforms
|
||||
ResourceLimits bool
|
||||
|
||||
// UnprivilegedUser is a flag for drivers to make the process
|
||||
// run as nobody
|
||||
UnprivilegedUser bool
|
||||
|
||||
// LogConfig provides the configuration related to log rotation
|
||||
LogConfig *structs.LogConfig
|
||||
// PortLowerBound is the lower bound of the ports that we can use to start
|
||||
// the syslog server
|
||||
PortLowerBound uint
|
||||
}
|
||||
|
||||
// ExecCommand holds the user command and args. It's a lightweight replacement
|
||||
// of exec.Cmd for serialization purposes.
|
||||
// ExecCommand holds the user command, args, and other isolation related
|
||||
// settings.
|
||||
type ExecCommand struct {
|
||||
Cmd string
|
||||
// Cmd is the command that the user wants to run.
|
||||
Cmd string
|
||||
|
||||
// Args is the args of the command that the user wants to run.
|
||||
Args []string
|
||||
|
||||
// FSIsolation determines whether the command would be run in a chroot.
|
||||
FSIsolation bool
|
||||
|
||||
// User is the user which the executor uses to run the command.
|
||||
User string
|
||||
|
||||
// ResourceLimits determines whether resource limits are enforced by the
|
||||
// executor.
|
||||
ResourceLimits bool
|
||||
}
|
||||
|
||||
// ProcessState holds information about the state of a user process.
|
||||
|
@ -69,37 +86,44 @@ type ProcessState struct {
|
|||
Time time.Time
|
||||
}
|
||||
|
||||
// Executor is the interface which allows a driver to launch and supervise
|
||||
// a process
|
||||
type Executor interface {
|
||||
LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (*ProcessState, error)
|
||||
Wait() (*ProcessState, error)
|
||||
ShutDown() error
|
||||
Exit() error
|
||||
UpdateLogConfig(logConfig *structs.LogConfig) error
|
||||
// SyslogServerState holds the address and islation information of a launched
|
||||
// syslog server
|
||||
type SyslogServerState struct {
|
||||
IsolationConfig *cstructs.IsolationConfig
|
||||
Addr string
|
||||
}
|
||||
|
||||
// UniversalExecutor is an implementation of the Executor which launches and
|
||||
// supervises processes. In addition to process supervision it provides resource
|
||||
// and file system isolation
|
||||
type UniversalExecutor struct {
|
||||
cmd exec.Cmd
|
||||
ctx *ExecutorContext
|
||||
cmd exec.Cmd
|
||||
ctx *ExecutorContext
|
||||
command *ExecCommand
|
||||
|
||||
taskDir string
|
||||
groups *cgroupConfig.Cgroup
|
||||
exitState *ProcessState
|
||||
processExited chan interface{}
|
||||
lre *logging.FileRotator
|
||||
lro *logging.FileRotator
|
||||
|
||||
lre *logging.FileRotator
|
||||
lro *logging.FileRotator
|
||||
rotatorLock sync.Mutex
|
||||
|
||||
syslogServer *logging.SyslogServer
|
||||
syslogChan chan *logging.SyslogMessage
|
||||
|
||||
groups *cgroupConfig.Cgroup
|
||||
cgLock sync.Mutex
|
||||
|
||||
logger *log.Logger
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewExecutor returns an Executor
|
||||
func NewExecutor(logger *log.Logger) Executor {
|
||||
return &UniversalExecutor{logger: logger, processExited: make(chan interface{})}
|
||||
return &UniversalExecutor{
|
||||
logger: logger,
|
||||
processExited: make(chan interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
// LaunchCmd launches a process and returns it's state. It also configures an
|
||||
|
@ -108,6 +132,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
|
|||
e.logger.Printf("[DEBUG] executor: launching command %v %v", command.Cmd, strings.Join(command.Args, " "))
|
||||
|
||||
e.ctx = ctx
|
||||
e.command = command
|
||||
|
||||
// configuring the task dir
|
||||
if err := e.configureTaskDir(); err != nil {
|
||||
|
@ -121,29 +146,18 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
|
|||
}
|
||||
|
||||
// setting the user of the process
|
||||
if e.ctx.UnprivilegedUser {
|
||||
if err := e.runAs("nobody"); err != nil {
|
||||
if command.User != "" {
|
||||
if err := e.runAs(command.User); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
|
||||
lro, err := logging.NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stdout", ctx.TaskName),
|
||||
ctx.LogConfig.MaxFiles, logFileSize, e.logger)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err)
|
||||
// Setup the loggers
|
||||
if err := e.configureLoggers(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.cmd.Stdout = lro
|
||||
e.lro = lro
|
||||
|
||||
lre, err := logging.NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stderr", ctx.TaskName),
|
||||
ctx.LogConfig.MaxFiles, logFileSize, e.logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err)
|
||||
}
|
||||
e.cmd.Stderr = lre
|
||||
e.lre = lre
|
||||
e.cmd.Stdout = e.lro
|
||||
e.cmd.Stderr = e.lre
|
||||
|
||||
// setting the env, path and args for the command
|
||||
e.ctx.TaskEnv.Build()
|
||||
|
@ -165,6 +179,32 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
|
|||
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil
|
||||
}
|
||||
|
||||
// configureLoggers sets up the standard out/error file rotators
|
||||
func (e *UniversalExecutor) configureLoggers() error {
|
||||
e.rotatorLock.Lock()
|
||||
defer e.rotatorLock.Unlock()
|
||||
|
||||
logFileSize := int64(e.ctx.Task.LogConfig.MaxFileSizeMB * 1024 * 1024)
|
||||
if e.lro == nil {
|
||||
lro, err := logging.NewFileRotator(e.ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stdout", e.ctx.Task.Name),
|
||||
e.ctx.Task.LogConfig.MaxFiles, logFileSize, e.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.lro = lro
|
||||
}
|
||||
|
||||
if e.lre == nil {
|
||||
lre, err := logging.NewFileRotator(e.ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stderr", e.ctx.Task.Name),
|
||||
e.ctx.Task.LogConfig.MaxFiles, logFileSize, e.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.lre = lre
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait waits until a process has exited and returns it's exitcode and errors
|
||||
func (e *UniversalExecutor) Wait() (*ProcessState, error) {
|
||||
<-e.processExited
|
||||
|
@ -173,7 +213,7 @@ func (e *UniversalExecutor) Wait() (*ProcessState, error) {
|
|||
|
||||
// UpdateLogConfig updates the log configuration
|
||||
func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error {
|
||||
e.ctx.LogConfig = logConfig
|
||||
e.ctx.Task.LogConfig = logConfig
|
||||
if e.lro == nil {
|
||||
return fmt.Errorf("log rotator for stdout doesn't exist")
|
||||
}
|
||||
|
@ -188,9 +228,22 @@ func (e *UniversalExecutor) UpdateLogConfig(logConfig *structs.LogConfig) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
|
||||
e.ctx.Task = task
|
||||
fileSize := int64(task.LogConfig.MaxFileSizeMB * 1024 * 1024)
|
||||
e.lro.MaxFiles = task.LogConfig.MaxFiles
|
||||
e.lro.FileSize = fileSize
|
||||
e.lre.MaxFiles = task.LogConfig.MaxFiles
|
||||
e.lre.FileSize = fileSize
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *UniversalExecutor) wait() {
|
||||
defer close(e.processExited)
|
||||
err := e.cmd.Wait()
|
||||
if e.syslogServer != nil {
|
||||
e.syslogServer.Shutdown()
|
||||
}
|
||||
e.lre.Close()
|
||||
e.lro.Close()
|
||||
if err == nil {
|
||||
|
@ -203,14 +256,6 @@ func (e *UniversalExecutor) wait() {
|
|||
exitCode = status.ExitStatus()
|
||||
}
|
||||
}
|
||||
if e.ctx.FSIsolation {
|
||||
e.removeChrootMounts()
|
||||
}
|
||||
if e.ctx.ResourceLimits {
|
||||
e.lock.Lock()
|
||||
DestroyCgroup(e.groups)
|
||||
e.lock.Unlock()
|
||||
}
|
||||
e.exitState = &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()}
|
||||
}
|
||||
|
||||
|
@ -224,7 +269,7 @@ var (
|
|||
// process
|
||||
func (e *UniversalExecutor) Exit() error {
|
||||
var merr multierror.Error
|
||||
if e.cmd.Process != nil {
|
||||
if e.command != nil && e.cmd.Process != nil {
|
||||
proc, err := os.FindProcess(e.cmd.Process.Pid)
|
||||
if err != nil {
|
||||
e.logger.Printf("[ERR] executor: can't find process with pid: %v, err: %v",
|
||||
|
@ -235,17 +280,17 @@ func (e *UniversalExecutor) Exit() error {
|
|||
}
|
||||
}
|
||||
|
||||
if e.ctx.FSIsolation {
|
||||
if e.command != nil && e.command.FSIsolation {
|
||||
if err := e.removeChrootMounts(); err != nil {
|
||||
merr.Errors = append(merr.Errors, err)
|
||||
}
|
||||
}
|
||||
if e.ctx.ResourceLimits {
|
||||
e.lock.Lock()
|
||||
if e.command != nil && e.command.ResourceLimits {
|
||||
e.cgLock.Lock()
|
||||
if err := DestroyCgroup(e.groups); err != nil {
|
||||
merr.Errors = append(merr.Errors, err)
|
||||
}
|
||||
e.lock.Unlock()
|
||||
e.cgLock.Unlock()
|
||||
}
|
||||
return merr.ErrorOrNil()
|
||||
}
|
||||
|
@ -270,10 +315,10 @@ func (e *UniversalExecutor) ShutDown() error {
|
|||
|
||||
// configureTaskDir sets the task dir in the executor
|
||||
func (e *UniversalExecutor) configureTaskDir() error {
|
||||
taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.TaskName]
|
||||
taskDir, ok := e.ctx.AllocDir.TaskDirs[e.ctx.Task.Name]
|
||||
e.taskDir = taskDir
|
||||
if !ok {
|
||||
return fmt.Errorf("couldn't find task directory for task %v", e.ctx.TaskName)
|
||||
return fmt.Errorf("couldn't find task directory for task %v", e.ctx.Task.Name)
|
||||
}
|
||||
e.cmd.Dir = taskDir
|
||||
return nil
|
||||
|
@ -299,3 +344,48 @@ func (e *UniversalExecutor) makeExecutablePosix(binPath string) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getFreePort returns a free port ready to be listened on between upper and
|
||||
// lower bounds
|
||||
func (e *UniversalExecutor) getListener(lowerBound uint, upperBound uint) (net.Listener, error) {
|
||||
if runtime.GOOS == "windows" {
|
||||
return e.listenerTCP(lowerBound, upperBound)
|
||||
}
|
||||
|
||||
return e.listenerUnix()
|
||||
}
|
||||
|
||||
// listenerTCP creates a TCP listener using an unused port between an upper and
|
||||
// lower bound
|
||||
func (e *UniversalExecutor) listenerTCP(lowerBound uint, upperBound uint) (net.Listener, error) {
|
||||
for i := lowerBound; i <= upperBound; i++ {
|
||||
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l, err := net.ListenTCP("tcp", addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
return l, nil
|
||||
}
|
||||
return nil, fmt.Errorf("No free port found")
|
||||
}
|
||||
|
||||
// listenerUnix creates a Unix domain socket
|
||||
func (e *UniversalExecutor) listenerUnix() (net.Listener, error) {
|
||||
f, err := ioutil.TempFile("", "plugin")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path := f.Name()
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.Remove(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return net.Listen("unix", path)
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ var (
|
|||
|
||||
func (e *UniversalExecutor) makeExecutable(binPath string) error {
|
||||
path := binPath
|
||||
if e.ctx.FSIsolation {
|
||||
if e.command.FSIsolation {
|
||||
// The path must be relative the chroot
|
||||
path = filepath.Join(e.taskDir, binPath)
|
||||
} else if !filepath.IsAbs(binPath) {
|
||||
|
@ -50,14 +50,14 @@ func (e *UniversalExecutor) makeExecutable(binPath string) error {
|
|||
|
||||
// configureIsolation configures chroot and creates cgroups
|
||||
func (e *UniversalExecutor) configureIsolation() error {
|
||||
if e.ctx.FSIsolation {
|
||||
if e.command.FSIsolation {
|
||||
if err := e.configureChroot(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if e.ctx.ResourceLimits {
|
||||
if err := e.configureCgroups(e.ctx.TaskResources); err != nil {
|
||||
if e.command.ResourceLimits {
|
||||
if err := e.configureCgroups(e.ctx.Task.Resources); err != nil {
|
||||
return fmt.Errorf("error creating cgroups: %v", err)
|
||||
}
|
||||
if err := e.applyLimits(os.Getpid()); err != nil {
|
||||
|
@ -75,7 +75,7 @@ func (e *UniversalExecutor) configureIsolation() error {
|
|||
|
||||
// applyLimits puts a process in a pre-configured cgroup
|
||||
func (e *UniversalExecutor) applyLimits(pid int) error {
|
||||
if !e.ctx.ResourceLimits {
|
||||
if !e.command.ResourceLimits {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -167,11 +167,11 @@ func (e *UniversalExecutor) runAs(userid string) error {
|
|||
// configureChroot configures a chroot
|
||||
func (e *UniversalExecutor) configureChroot() error {
|
||||
allocDir := e.ctx.AllocDir
|
||||
if err := allocDir.MountSharedDir(e.ctx.TaskName); err != nil {
|
||||
if err := allocDir.MountSharedDir(e.ctx.Task.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := allocDir.Embed(e.ctx.TaskName, chrootEnv); err != nil {
|
||||
if err := allocDir.Embed(e.ctx.Task.Name, chrootEnv); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -195,8 +195,8 @@ func (e *UniversalExecutor) configureChroot() error {
|
|||
// should be called when tearing down the task.
|
||||
func (e *UniversalExecutor) removeChrootMounts() error {
|
||||
// Prevent a race between Wait/ForceStop
|
||||
e.lock.Lock()
|
||||
defer e.lock.Unlock()
|
||||
e.cgLock.Lock()
|
||||
defer e.cgLock.Unlock()
|
||||
return e.ctx.AllocDir.UnmountAll()
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
// +build !windows
|
||||
|
||||
package executor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log/syslog"
|
||||
|
||||
"github.com/hashicorp/nomad/client/driver/logging"
|
||||
)
|
||||
|
||||
func (e *UniversalExecutor) LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) {
|
||||
e.ctx = ctx
|
||||
e.syslogChan = make(chan *logging.SyslogMessage, 2048)
|
||||
l, err := e.getListener(e.ctx.PortLowerBound, e.ctx.PortUpperBound)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String())
|
||||
if err := e.configureLoggers(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.syslogServer = logging.NewSyslogServer(l, e.syslogChan, e.logger)
|
||||
go e.syslogServer.Start()
|
||||
go e.collectLogs(e.lre, e.lro)
|
||||
syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String())
|
||||
return &SyslogServerState{Addr: syslogAddr}, nil
|
||||
}
|
||||
|
||||
func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) {
|
||||
for logParts := range e.syslogChan {
|
||||
// If the severity of the log line is err then we write to stderr
|
||||
// otherwise all messages go to stdout
|
||||
if logParts.Severity == syslog.LOG_ERR {
|
||||
e.lre.Write(logParts.Message)
|
||||
e.lre.Write([]byte{'\n'})
|
||||
} else {
|
||||
e.lro.Write(logParts.Message)
|
||||
e.lro.Write([]byte{'\n'})
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,7 +30,7 @@ var (
|
|||
}
|
||||
)
|
||||
|
||||
func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) {
|
||||
func mockAllocDir(t *testing.T) (*structs.Task, *allocdir.AllocDir) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
|
||||
|
@ -39,18 +39,16 @@ func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) {
|
|||
log.Panicf("allocDir.Build() failed: %v", err)
|
||||
}
|
||||
|
||||
return task.Name, allocDir
|
||||
return task, allocDir
|
||||
}
|
||||
|
||||
func testExecutorContext(t *testing.T) *ExecutorContext {
|
||||
taskEnv := env.NewTaskEnvironment(mock.Node())
|
||||
taskName, allocDir := mockAllocDir(t)
|
||||
task, allocDir := mockAllocDir(t)
|
||||
ctx := &ExecutorContext{
|
||||
TaskEnv: taskEnv,
|
||||
TaskName: taskName,
|
||||
AllocDir: allocDir,
|
||||
TaskResources: constraint,
|
||||
LogConfig: structs.DefaultLogConfig(),
|
||||
TaskEnv: taskEnv,
|
||||
Task: task,
|
||||
AllocDir: allocDir,
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
|
@ -80,6 +78,9 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) {
|
|||
if ps.ExitCode < 1 {
|
||||
t.Fatalf("expected exit code to be non zero, actual: %v", ps.ExitCode)
|
||||
}
|
||||
if err := executor.Exit(); err != nil {
|
||||
t.Fatalf("error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecutor_Start_Wait(t *testing.T) {
|
||||
|
@ -98,6 +99,9 @@ func TestExecutor_Start_Wait(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("error in waiting for command: %v", err)
|
||||
}
|
||||
if err := executor.Exit(); err != nil {
|
||||
t.Fatalf("error: %v", err)
|
||||
}
|
||||
|
||||
expected := "hello world"
|
||||
file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0")
|
||||
|
@ -119,9 +123,9 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
|
|||
ctx := testExecutorContext(t)
|
||||
defer ctx.AllocDir.Destroy()
|
||||
|
||||
ctx.FSIsolation = true
|
||||
ctx.ResourceLimits = true
|
||||
ctx.UnprivilegedUser = true
|
||||
execCmd.FSIsolation = true
|
||||
execCmd.ResourceLimits = true
|
||||
execCmd.User = "nobody"
|
||||
|
||||
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
|
||||
ps, err := executor.LaunchCmd(&execCmd, ctx)
|
||||
|
@ -135,6 +139,9 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("error in waiting for command: %v", err)
|
||||
}
|
||||
if err := executor.Exit(); err != nil {
|
||||
t.Fatalf("error: %v", err)
|
||||
}
|
||||
|
||||
expected := "hello world"
|
||||
file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0")
|
||||
|
@ -154,13 +161,13 @@ func TestExecutor_DestroyCgroup(t *testing.T) {
|
|||
|
||||
execCmd := ExecCommand{Cmd: "/bin/bash", Args: []string{"-c", "/usr/bin/yes"}}
|
||||
ctx := testExecutorContext(t)
|
||||
ctx.LogConfig.MaxFiles = 1
|
||||
ctx.LogConfig.MaxFileSizeMB = 300
|
||||
ctx.Task.LogConfig.MaxFiles = 1
|
||||
ctx.Task.LogConfig.MaxFileSizeMB = 300
|
||||
defer ctx.AllocDir.Destroy()
|
||||
|
||||
ctx.FSIsolation = true
|
||||
ctx.ResourceLimits = true
|
||||
ctx.UnprivilegedUser = true
|
||||
execCmd.FSIsolation = true
|
||||
execCmd.ResourceLimits = true
|
||||
execCmd.User = "nobody"
|
||||
|
||||
executor := NewExecutor(log.New(os.Stdout, "", log.LstdFlags))
|
||||
ps, err := executor.LaunchCmd(&execCmd, ctx)
|
||||
|
@ -171,7 +178,10 @@ func TestExecutor_DestroyCgroup(t *testing.T) {
|
|||
t.Fatalf("expected process to start and have non zero pid")
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
executor.Exit()
|
||||
if err := executor.Exit(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0")
|
||||
finfo, err := os.Stat(file)
|
||||
if err != nil {
|
||||
|
@ -203,6 +213,9 @@ func TestExecutor_Start_Kill(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("error in waiting for command: %v", err)
|
||||
}
|
||||
if err := executor.Exit(); err != nil {
|
||||
t.Fatalf("error: %v", err)
|
||||
}
|
||||
|
||||
file := filepath.Join(ctx.AllocDir.LogDir(), "web.stdout.0")
|
||||
time.Sleep(time.Duration(tu.TestMultiplier()*2) * time.Second)
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
package executor
|
||||
|
||||
func (e *UniversalExecutor) LaunchSyslogServer(ctx *ExecutorContext) (*SyslogServerState, error) {
|
||||
return nil, nil
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package driver
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"log"
|
||||
"net/rpc"
|
||||
|
||||
|
@ -9,6 +10,14 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// Registering these types since we have to serialize and de-serialize the Task
|
||||
// structs over the wire between drivers and the executor.
|
||||
func init() {
|
||||
gob.Register([]interface{}{})
|
||||
gob.Register(map[string]interface{}{})
|
||||
gob.Register([]map[string]string{})
|
||||
}
|
||||
|
||||
type ExecutorRPC struct {
|
||||
client *rpc.Client
|
||||
}
|
||||
|
@ -19,12 +28,23 @@ type LaunchCmdArgs struct {
|
|||
Ctx *executor.ExecutorContext
|
||||
}
|
||||
|
||||
// LaunchSyslogServerArgs wraps the executor context for the purposes of RPC
|
||||
type LaunchSyslogServerArgs struct {
|
||||
Ctx *executor.ExecutorContext
|
||||
}
|
||||
|
||||
func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand, ctx *executor.ExecutorContext) (*executor.ProcessState, error) {
|
||||
var ps *executor.ProcessState
|
||||
err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps)
|
||||
return ps, err
|
||||
}
|
||||
|
||||
func (e *ExecutorRPC) LaunchSyslogServer(ctx *executor.ExecutorContext) (*executor.SyslogServerState, error) {
|
||||
var ss *executor.SyslogServerState
|
||||
err := e.client.Call("Plugin.LaunchSyslogServer", LaunchSyslogServerArgs{Ctx: ctx}, &ss)
|
||||
return ss, err
|
||||
}
|
||||
|
||||
func (e *ExecutorRPC) Wait() (*executor.ProcessState, error) {
|
||||
var ps executor.ProcessState
|
||||
err := e.client.Call("Plugin.Wait", new(interface{}), &ps)
|
||||
|
@ -43,6 +63,10 @@ func (e *ExecutorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error {
|
|||
return e.client.Call("Plugin.UpdateLogConfig", logConfig, new(interface{}))
|
||||
}
|
||||
|
||||
func (e *ExecutorRPC) UpdateTask(task *structs.Task) error {
|
||||
return e.client.Call("Plugin.UpdateTask", task, new(interface{}))
|
||||
}
|
||||
|
||||
type ExecutorRPCServer struct {
|
||||
Impl executor.Executor
|
||||
}
|
||||
|
@ -55,6 +79,14 @@ func (e *ExecutorRPCServer) LaunchCmd(args LaunchCmdArgs, ps *executor.ProcessSt
|
|||
return err
|
||||
}
|
||||
|
||||
func (e *ExecutorRPCServer) LaunchSyslogServer(args LaunchSyslogServerArgs, ss *executor.SyslogServerState) error {
|
||||
state, err := e.Impl.LaunchSyslogServer(args.Ctx)
|
||||
if state != nil {
|
||||
*ss = *state
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *ExecutorRPCServer) Wait(args interface{}, ps *executor.ProcessState) error {
|
||||
state, err := e.Impl.Wait()
|
||||
if state != nil {
|
||||
|
@ -75,6 +107,10 @@ func (e *ExecutorRPCServer) UpdateLogConfig(args *structs.LogConfig, resp *inter
|
|||
return e.Impl.UpdateLogConfig(args)
|
||||
}
|
||||
|
||||
func (e *ExecutorRPCServer) UpdateTask(args *structs.Task, resp *interface{}) error {
|
||||
return e.Impl.UpdateTask(args)
|
||||
}
|
||||
|
||||
type ExecutorPlugin struct {
|
||||
logger *log.Logger
|
||||
Impl *ExecutorRPCServer
|
||||
|
|
|
@ -154,14 +154,9 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
return nil, err
|
||||
}
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
TaskName: task.Name,
|
||||
TaskResources: task.Resources,
|
||||
LogConfig: task.LogConfig,
|
||||
FSIsolation: true,
|
||||
UnprivilegedUser: true,
|
||||
ResourceLimits: true,
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
Task: task,
|
||||
}
|
||||
|
||||
absPath, err := GetAbsolutePath("java")
|
||||
|
@ -169,7 +164,13 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: args}, executorCtx)
|
||||
ps, err := execIntf.LaunchCmd(&executor.ExecCommand{
|
||||
Cmd: absPath,
|
||||
Args: args,
|
||||
FSIsolation: true,
|
||||
ResourceLimits: true,
|
||||
User: cstructs.DefaultUnpriviledgedUser,
|
||||
}, executorCtx)
|
||||
if err != nil {
|
||||
pluginClient.Kill()
|
||||
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
|
||||
|
@ -290,7 +291,7 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
func (h *javaHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
h.executor.UpdateTask(task)
|
||||
|
||||
// Update is not possible
|
||||
return nil
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
package logging
|
||||
|
||||
type SyslogServer struct {
|
||||
}
|
||||
|
||||
func (s *SyslogServer) Shutdown() {
|
||||
}
|
||||
|
||||
type SyslogMessage struct {
|
||||
}
|
|
@ -191,11 +191,9 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
return nil, err
|
||||
}
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
TaskName: task.Name,
|
||||
TaskResources: task.Resources,
|
||||
LogConfig: task.LogConfig,
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
Task: task,
|
||||
}
|
||||
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: args[0], Args: args[1:]}, executorCtx)
|
||||
if err != nil {
|
||||
|
@ -292,7 +290,7 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
func (h *qemuHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
h.executor.UpdateTask(task)
|
||||
|
||||
// Update is not possible
|
||||
return nil
|
||||
|
|
|
@ -96,11 +96,9 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
|||
return nil, err
|
||||
}
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
TaskName: task.Name,
|
||||
TaskResources: task.Resources,
|
||||
LogConfig: task.LogConfig,
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
Task: task,
|
||||
}
|
||||
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx)
|
||||
if err != nil {
|
||||
|
@ -195,7 +193,7 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
func (h *rawExecHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
h.executor.UpdateTask(task)
|
||||
|
||||
// Update is not possible
|
||||
return nil
|
||||
|
|
|
@ -233,12 +233,9 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
|||
return nil, err
|
||||
}
|
||||
executorCtx := &executor.ExecutorContext{
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
TaskName: task.Name,
|
||||
TaskResources: task.Resources,
|
||||
UnprivilegedUser: false,
|
||||
LogConfig: task.LogConfig,
|
||||
TaskEnv: d.taskEnv,
|
||||
AllocDir: ctx.AllocDir,
|
||||
Task: task,
|
||||
}
|
||||
|
||||
absPath, err := GetAbsolutePath("rkt")
|
||||
|
@ -329,7 +326,7 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult {
|
|||
func (h *rktHandle) Update(task *structs.Task) error {
|
||||
// Store the updated kill timeout.
|
||||
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
|
||||
h.executor.UpdateLogConfig(task.LogConfig)
|
||||
h.executor.UpdateTask(task)
|
||||
|
||||
// Update is not possible
|
||||
return nil
|
||||
|
|
|
@ -6,6 +6,10 @@ import (
|
|||
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultUnpriviledgedUser = "nobody"
|
||||
)
|
||||
|
||||
// WaitResult stores the result of a Wait operation.
|
||||
type WaitResult struct {
|
||||
ExitCode int
|
||||
|
|
Loading…
Reference in New Issue