485 lines
14 KiB
Go
485 lines
14 KiB
Go
package driver
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mitchellh/mapstructure"
|
|
|
|
hclog "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/nomad/client/driver/logging"
|
|
dstructs "github.com/hashicorp/nomad/client/driver/structs"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
const (
|
|
// ShutdownPeriodicAfter is a config key that can be used during tests to
|
|
// "stop" a previously-functioning driver, allowing for testing of periodic
|
|
// drivers and fingerprinters
|
|
ShutdownPeriodicAfter = "test.shutdown_periodic_after"
|
|
|
|
// ShutdownPeriodicDuration is a config option that can be used during tests
|
|
// to "stop" a previously functioning driver after the specified duration
|
|
// (specified in seconds) for testing of periodic drivers and fingerprinters.
|
|
ShutdownPeriodicDuration = "test.shutdown_periodic_duration"
|
|
|
|
mockDriverName = "driver.mock_driver"
|
|
)
|
|
|
|
// MockDriverConfig is the driver configuration for the MockDriver
|
|
type MockDriverConfig struct {
|
|
|
|
// StartErr specifies the error that should be returned when starting the
|
|
// mock driver.
|
|
StartErr string `mapstructure:"start_error"`
|
|
|
|
// StartErrRecoverable marks the error returned is recoverable
|
|
StartErrRecoverable bool `mapstructure:"start_error_recoverable"`
|
|
|
|
// StartBlockFor specifies a duration in which to block Start before
|
|
// returning. Useful for testing the behavior of tasks in pending.
|
|
StartBlockFor time.Duration `mapstructure:"start_block_for"`
|
|
|
|
// KillAfter is the duration after which the mock driver indicates the task
|
|
// has exited after getting the initial SIGINT signal
|
|
KillAfter time.Duration `mapstructure:"kill_after"`
|
|
|
|
// RunFor is the duration for which the fake task runs for. After this
|
|
// period the MockDriver responds to the task running indicating that the
|
|
// task has terminated
|
|
RunFor time.Duration `mapstructure:"run_for"`
|
|
|
|
// ExitCode is the exit code with which the MockDriver indicates the task
|
|
// has exited
|
|
ExitCode int `mapstructure:"exit_code"`
|
|
|
|
// ExitSignal is the signal with which the MockDriver indicates the task has
|
|
// been killed
|
|
ExitSignal int `mapstructure:"exit_signal"`
|
|
|
|
// ExitErrMsg is the error message that the task returns while exiting
|
|
ExitErrMsg string `mapstructure:"exit_err_msg"`
|
|
|
|
// SignalErr is the error message that the task returns if signalled
|
|
SignalErr string `mapstructure:"signal_error"`
|
|
|
|
// DriverIP will be returned as the DriverNetwork.IP from Start()
|
|
DriverIP string `mapstructure:"driver_ip"`
|
|
|
|
// DriverAdvertise will be returned as DriverNetwork.AutoAdvertise from
|
|
// Start().
|
|
DriverAdvertise bool `mapstructure:"driver_advertise"`
|
|
|
|
// DriverPortMap will parse a label:number pair and return it in
|
|
// DriverNetwork.PortMap from Start().
|
|
DriverPortMap string `mapstructure:"driver_port_map"`
|
|
|
|
// StdoutString is the string that should be sent to stdout
|
|
StdoutString string `mapstructure:"stdout_string"`
|
|
|
|
// StdoutRepeat is the number of times the output should be sent.
|
|
StdoutRepeat int `mapstructure:"stdout_repeat"`
|
|
|
|
// StdoutRepeatDur is the duration between repeated outputs.
|
|
StdoutRepeatDur time.Duration `mapstructure:"stdout_repeat_duration"`
|
|
}
|
|
|
|
// MockDriver is a driver which is used for testing purposes
|
|
type MockDriver struct {
|
|
DriverContext
|
|
|
|
cleanupFailNum int
|
|
|
|
// shutdownFingerprintTime is the time up to which the driver will be up
|
|
shutdownFingerprintTime time.Time
|
|
}
|
|
|
|
// NewMockDriver is a factory method which returns a new Mock Driver
|
|
func NewMockDriver(ctx *DriverContext) Driver {
|
|
md := &MockDriver{DriverContext: *ctx}
|
|
|
|
// if the shutdown configuration options are set, start the timer here.
|
|
// This config option defaults to false
|
|
if ctx.config != nil && ctx.config.ReadBoolDefault(ShutdownPeriodicAfter, false) {
|
|
duration, err := ctx.config.ReadInt(ShutdownPeriodicDuration)
|
|
if err != nil {
|
|
errMsg := fmt.Sprintf("unable to read config option for shutdown_periodic_duration %v, got err %s", duration, err.Error())
|
|
panic(errMsg)
|
|
}
|
|
md.shutdownFingerprintTime = time.Now().Add(time.Second * time.Duration(duration))
|
|
}
|
|
|
|
return md
|
|
}
|
|
|
|
func (d *MockDriver) Abilities() DriverAbilities {
|
|
return DriverAbilities{
|
|
SendSignals: false,
|
|
Exec: true,
|
|
}
|
|
}
|
|
|
|
func (d *MockDriver) FSIsolation() cstructs.FSIsolation {
|
|
return cstructs.FSIsolationNone
|
|
}
|
|
|
|
func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Start starts the mock driver
|
|
func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse, error) {
|
|
var driverConfig MockDriverConfig
|
|
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
|
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
|
|
WeaklyTypedInput: true,
|
|
Result: &driverConfig,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := dec.Decode(task.Config); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if driverConfig.StartBlockFor != 0 {
|
|
time.Sleep(driverConfig.StartBlockFor)
|
|
}
|
|
|
|
if driverConfig.StartErr != "" {
|
|
return nil, structs.NewRecoverableError(errors.New(driverConfig.StartErr), driverConfig.StartErrRecoverable)
|
|
}
|
|
|
|
// Create the driver network
|
|
net := &cstructs.DriverNetwork{
|
|
IP: driverConfig.DriverIP,
|
|
AutoAdvertise: driverConfig.DriverAdvertise,
|
|
}
|
|
if raw := driverConfig.DriverPortMap; len(raw) > 0 {
|
|
parts := strings.Split(raw, ":")
|
|
if len(parts) != 2 {
|
|
return nil, fmt.Errorf("malformed port map: %q", raw)
|
|
}
|
|
port, err := strconv.Atoi(parts[1])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("malformed port map: %q -- error: %v", raw, err)
|
|
}
|
|
net.PortMap = map[string]int{parts[0]: port}
|
|
}
|
|
|
|
h := mockDriverHandle{
|
|
ctx: ctx,
|
|
task: task,
|
|
taskName: task.Name,
|
|
runFor: driverConfig.RunFor,
|
|
killAfter: driverConfig.KillAfter,
|
|
killTimeout: task.KillTimeout,
|
|
exitCode: driverConfig.ExitCode,
|
|
exitSignal: driverConfig.ExitSignal,
|
|
stdoutString: driverConfig.StdoutString,
|
|
stdoutRepeat: driverConfig.StdoutRepeat,
|
|
stdoutRepeatDur: driverConfig.StdoutRepeatDur,
|
|
logger: m.logger,
|
|
doneCh: make(chan struct{}),
|
|
waitCh: make(chan *dstructs.WaitResult, 1),
|
|
}
|
|
if driverConfig.ExitErrMsg != "" {
|
|
h.exitErr = errors.New(driverConfig.ExitErrMsg)
|
|
}
|
|
if driverConfig.SignalErr != "" {
|
|
h.signalErr = fmt.Errorf(driverConfig.SignalErr)
|
|
}
|
|
m.logger.Printf("[DEBUG] driver.mock: starting task %q", task.Name)
|
|
go h.run()
|
|
|
|
return &StartResponse{Handle: &h, Network: net}, nil
|
|
}
|
|
|
|
// Cleanup deletes all keys except for Config.Options["cleanup_fail_on"] for
|
|
// Config.Options["cleanup_fail_num"] times. For failures it will return a
|
|
// recoverable error.
|
|
func (m *MockDriver) Cleanup(ctx *ExecContext, res *CreatedResources) error {
|
|
if res == nil {
|
|
panic("Cleanup should not be called with nil *CreatedResources")
|
|
}
|
|
|
|
var err error
|
|
failn, _ := strconv.Atoi(m.config.Options["cleanup_fail_num"])
|
|
failk := m.config.Options["cleanup_fail_on"]
|
|
for k := range res.Resources {
|
|
if k == failk && m.cleanupFailNum < failn {
|
|
m.cleanupFailNum++
|
|
err = structs.NewRecoverableError(fmt.Errorf("mock_driver failure on %q call %d/%d", k, m.cleanupFailNum, failn), true)
|
|
} else {
|
|
delete(res.Resources, k)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Validate validates the mock driver configuration
|
|
func (m *MockDriver) Validate(map[string]interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
// Fingerprint fingerprints a node and returns if MockDriver is enabled
|
|
func (m *MockDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstructs.FingerprintResponse) error {
|
|
switch {
|
|
// If the driver is configured to shut down after a period of time, and the
|
|
// current time is after the time which the node should shut down, simulate
|
|
// driver failure
|
|
case !m.shutdownFingerprintTime.IsZero() && time.Now().After(m.shutdownFingerprintTime):
|
|
resp.RemoveAttribute(mockDriverName)
|
|
default:
|
|
resp.AddAttribute(mockDriverName, "1")
|
|
resp.Detected = true
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// When testing, poll for updates
|
|
func (m *MockDriver) Periodic() (bool, time.Duration) {
|
|
return true, 500 * time.Millisecond
|
|
}
|
|
|
|
// HealthCheck implements the interface for HealthCheck, and indicates the current
|
|
// health status of the mock driver.
|
|
func (m *MockDriver) HealthCheck(req *cstructs.HealthCheckRequest, resp *cstructs.HealthCheckResponse) error {
|
|
switch {
|
|
case !m.shutdownFingerprintTime.IsZero() && time.Now().After(m.shutdownFingerprintTime):
|
|
notHealthy := &structs.DriverInfo{
|
|
Healthy: false,
|
|
HealthDescription: "not running",
|
|
UpdateTime: time.Now(),
|
|
}
|
|
resp.AddDriverInfo("mock_driver", notHealthy)
|
|
return nil
|
|
default:
|
|
healthy := &structs.DriverInfo{
|
|
Healthy: true,
|
|
HealthDescription: "running",
|
|
UpdateTime: time.Now(),
|
|
}
|
|
resp.AddDriverInfo("mock_driver", healthy)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// GetHealthCheckInterval implements the interface for HealthCheck and indicates
|
|
// that mock driver should be checked periodically. Returns a boolean
|
|
// indicating if it should be checked, and the duration at which to do this
|
|
// check.
|
|
func (m *MockDriver) GetHealthCheckInterval(req *cstructs.HealthCheckIntervalRequest, resp *cstructs.HealthCheckIntervalResponse) error {
|
|
resp.Eligible = true
|
|
resp.Period = 1 * time.Second
|
|
return nil
|
|
}
|
|
|
|
// MockDriverHandle is a driver handler which supervises a mock task
|
|
type mockDriverHandle struct {
|
|
ctx *ExecContext
|
|
task *structs.Task
|
|
taskName string
|
|
runFor time.Duration
|
|
killAfter time.Duration
|
|
killTimeout time.Duration
|
|
exitCode int
|
|
exitSignal int
|
|
exitErr error
|
|
signalErr error
|
|
logger *log.Logger
|
|
stdoutString string
|
|
stdoutRepeat int
|
|
stdoutRepeatDur time.Duration
|
|
waitCh chan *dstructs.WaitResult
|
|
doneCh chan struct{}
|
|
}
|
|
|
|
type mockDriverID struct {
|
|
TaskName string
|
|
RunFor time.Duration
|
|
KillAfter time.Duration
|
|
KillTimeout time.Duration
|
|
ExitCode int
|
|
ExitSignal int
|
|
ExitErr error
|
|
SignalErr error
|
|
}
|
|
|
|
func (h *mockDriverHandle) ID() string {
|
|
id := mockDriverID{
|
|
TaskName: h.taskName,
|
|
RunFor: h.runFor,
|
|
KillAfter: h.killAfter,
|
|
KillTimeout: h.killTimeout,
|
|
ExitCode: h.exitCode,
|
|
ExitSignal: h.exitSignal,
|
|
ExitErr: h.exitErr,
|
|
SignalErr: h.signalErr,
|
|
}
|
|
|
|
data, err := json.Marshal(id)
|
|
if err != nil {
|
|
h.logger.Printf("[ERR] driver.mock_driver: failed to marshal ID to JSON: %s", err)
|
|
}
|
|
return string(data)
|
|
}
|
|
|
|
// Open re-connects the driver to the running task
|
|
func (m *MockDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
|
|
id := &mockDriverID{}
|
|
if err := json.Unmarshal([]byte(handleID), id); err != nil {
|
|
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
|
|
}
|
|
|
|
h := mockDriverHandle{
|
|
taskName: id.TaskName,
|
|
runFor: id.RunFor,
|
|
killAfter: id.KillAfter,
|
|
killTimeout: id.KillTimeout,
|
|
exitCode: id.ExitCode,
|
|
exitSignal: id.ExitSignal,
|
|
exitErr: id.ExitErr,
|
|
signalErr: id.SignalErr,
|
|
logger: m.logger,
|
|
doneCh: make(chan struct{}),
|
|
waitCh: make(chan *dstructs.WaitResult, 1),
|
|
}
|
|
|
|
go h.run()
|
|
return &h, nil
|
|
}
|
|
|
|
func (h *mockDriverHandle) WaitCh() chan *dstructs.WaitResult {
|
|
return h.waitCh
|
|
}
|
|
|
|
func (h *mockDriverHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
|
|
h.logger.Printf("[DEBUG] driver.mock: Exec(%q, %q)", cmd, args)
|
|
return []byte(fmt.Sprintf("Exec(%q, %q)", cmd, args)), 0, nil
|
|
}
|
|
|
|
// TODO Implement when we need it.
|
|
func (h *mockDriverHandle) Update(task *structs.Task) error {
|
|
h.killTimeout = task.KillTimeout
|
|
return nil
|
|
}
|
|
|
|
// TODO Implement when we need it.
|
|
func (d *mockDriverHandle) Network() *cstructs.DriverNetwork {
|
|
return nil
|
|
}
|
|
|
|
// TODO Implement when we need it.
|
|
func (h *mockDriverHandle) Signal(s os.Signal) error {
|
|
return h.signalErr
|
|
}
|
|
|
|
// Kill kills a mock task
|
|
func (h *mockDriverHandle) Kill() error {
|
|
h.logger.Printf("[DEBUG] driver.mock: killing task %q after %s or kill timeout: %v", h.taskName, h.killAfter, h.killTimeout)
|
|
select {
|
|
case <-h.doneCh:
|
|
case <-time.After(h.killAfter):
|
|
select {
|
|
case <-h.doneCh:
|
|
// already closed
|
|
default:
|
|
close(h.doneCh)
|
|
}
|
|
case <-time.After(h.killTimeout):
|
|
h.logger.Printf("[DEBUG] driver.mock: terminating task %q", h.taskName)
|
|
select {
|
|
case <-h.doneCh:
|
|
// already closed
|
|
default:
|
|
close(h.doneCh)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO Implement when we need it.
|
|
func (h *mockDriverHandle) Stats() (*cstructs.TaskResourceUsage, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// run waits for the configured amount of time and then indicates the task has
|
|
// terminated
|
|
func (h *mockDriverHandle) run() {
|
|
defer close(h.waitCh)
|
|
|
|
// Setup logging output
|
|
if h.stdoutString != "" {
|
|
go h.handleLogging()
|
|
}
|
|
|
|
timer := time.NewTimer(h.runFor)
|
|
defer timer.Stop()
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
select {
|
|
case <-h.doneCh:
|
|
// already closed
|
|
default:
|
|
close(h.doneCh)
|
|
}
|
|
case <-h.doneCh:
|
|
h.logger.Printf("[DEBUG] driver.mock: finished running task %q", h.taskName)
|
|
h.waitCh <- dstructs.NewWaitResult(h.exitCode, h.exitSignal, h.exitErr)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleLogging handles logging stdout messages
|
|
func (h *mockDriverHandle) handleLogging() {
|
|
if h.stdoutString == "" {
|
|
return
|
|
}
|
|
|
|
// Setup a log rotator
|
|
logFileSize := int64(h.task.LogConfig.MaxFileSizeMB * 1024 * 1024)
|
|
lro, err := logging.NewFileRotator(h.ctx.TaskDir.LogDir, fmt.Sprintf("%v.stdout", h.taskName),
|
|
h.task.LogConfig.MaxFiles, logFileSize, hclog.Default()) //TODO: plumb hclog
|
|
if err != nil {
|
|
h.exitErr = err
|
|
close(h.doneCh)
|
|
h.logger.Printf("[ERR] mock_driver: failed to setup file rotator: %v", err)
|
|
return
|
|
}
|
|
defer lro.Close()
|
|
|
|
// Do initial write to stdout.
|
|
if _, err := io.WriteString(lro, h.stdoutString); err != nil {
|
|
h.exitErr = err
|
|
close(h.doneCh)
|
|
h.logger.Printf("[ERR] mock_driver: failed to write to stdout: %v", err)
|
|
return
|
|
}
|
|
|
|
for i := 0; i < h.stdoutRepeat; i++ {
|
|
select {
|
|
case <-h.doneCh:
|
|
return
|
|
case <-time.After(h.stdoutRepeatDur):
|
|
if _, err := io.WriteString(lro, h.stdoutString); err != nil {
|
|
h.exitErr = err
|
|
close(h.doneCh)
|
|
h.logger.Printf("[ERR] mock_driver: failed to write to stdout: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|