Task runner sends signals
This commit is contained in:
parent
00a1234c55
commit
bc35eaee21
|
@ -234,6 +234,7 @@ type TaskState struct {
|
|||
}
|
||||
|
||||
const (
|
||||
TaskSetupFailure = "Setup Failure"
|
||||
TaskDriverFailure = "Driver Failure"
|
||||
TaskReceived = "Received"
|
||||
TaskFailedValidation = "Failed Validation"
|
||||
|
@ -258,6 +259,7 @@ type TaskEvent struct {
|
|||
Type string
|
||||
Time int64
|
||||
RestartReason string
|
||||
SetupError string
|
||||
DriverError string
|
||||
ExitCode int
|
||||
Signal int
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/hashicorp/consul-template/manager"
|
||||
"github.com/hashicorp/consul-template/signals"
|
||||
"github.com/hashicorp/consul-template/watch"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/driver/env"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -29,7 +30,7 @@ type TaskHooks interface {
|
|||
Restart(source, reason string)
|
||||
|
||||
// Signal is used to signal the task
|
||||
Signal(source, reason string, s os.Signal)
|
||||
Signal(source, reason string, s os.Signal) error
|
||||
|
||||
// UnblockStart is used to unblock the starting of the task. This should be
|
||||
// called after prestart work is completed
|
||||
|
@ -277,8 +278,20 @@ func (tm *TaskTemplateManager) run() {
|
|||
if restart {
|
||||
tm.hook.Restart("consul-template", "template with change_mode restart re-rendered")
|
||||
} else if len(signals) != 0 {
|
||||
var mErr multierror.Error
|
||||
for signal := range signals {
|
||||
tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal])
|
||||
err := tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal])
|
||||
if err != nil {
|
||||
multierror.Append(&mErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := mErr.ErrorOrNil(); err != nil {
|
||||
flat := make([]os.Signal, 0, len(signals))
|
||||
for signal := range signals {
|
||||
flat = append(flat, tm.signals[signal])
|
||||
}
|
||||
tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,10 +26,14 @@ type MockTaskHooks struct {
|
|||
Signals []os.Signal
|
||||
SignalCh chan struct{}
|
||||
|
||||
// SignalError is returned when Signal is called on the mock hook
|
||||
SignalError error
|
||||
|
||||
UnblockCh chan struct{}
|
||||
Unblocked bool
|
||||
|
||||
KillReason string
|
||||
KillCh chan struct{}
|
||||
}
|
||||
|
||||
func NewMockTaskHooks() *MockTaskHooks {
|
||||
|
@ -37,6 +41,7 @@ func NewMockTaskHooks() *MockTaskHooks {
|
|||
UnblockCh: make(chan struct{}, 1),
|
||||
RestartCh: make(chan struct{}, 1),
|
||||
SignalCh: make(chan struct{}, 1),
|
||||
KillCh: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
func (m *MockTaskHooks) Restart(source, reason string) {
|
||||
|
@ -47,15 +52,24 @@ func (m *MockTaskHooks) Restart(source, reason string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) {
|
||||
func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) error {
|
||||
m.Signals = append(m.Signals, s)
|
||||
select {
|
||||
case m.SignalCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
return m.SignalError
|
||||
}
|
||||
|
||||
func (m *MockTaskHooks) Kill(source, reason string) {
|
||||
m.KillReason = reason
|
||||
select {
|
||||
case m.KillCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockTaskHooks) Kill(source, reason string) { m.KillReason = reason }
|
||||
func (m *MockTaskHooks) UnblockStart(source string) {
|
||||
if !m.Unblocked {
|
||||
close(m.UnblockCh)
|
||||
|
@ -673,17 +687,13 @@ func TestTaskTemplateManager_AllRendered_Signal(t *testing.T) {
|
|||
harness.consul.SetKV(key1, []byte(content1_1))
|
||||
|
||||
// Wait for restart
|
||||
timeout := time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second)
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-harness.mockHooks.RestartCh:
|
||||
t.Fatalf("Restart with signal policy: %+v", harness.mockHooks)
|
||||
case <-harness.mockHooks.SignalCh:
|
||||
break OUTER
|
||||
case <-timeout:
|
||||
t.Fatalf("Should have received a signals: %+v", harness.mockHooks)
|
||||
}
|
||||
select {
|
||||
case <-harness.mockHooks.RestartCh:
|
||||
t.Fatalf("Restart with signal policy: %+v", harness.mockHooks)
|
||||
case <-harness.mockHooks.SignalCh:
|
||||
break
|
||||
case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second):
|
||||
t.Fatalf("Should have received a signals: %+v", harness.mockHooks)
|
||||
}
|
||||
|
||||
// Check the files have been updated
|
||||
|
@ -697,3 +707,40 @@ OUTER:
|
|||
t.Fatalf("Unexpected template data; got %q, want %q", s, content1_1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskTemplateManager_Signal_Error(t *testing.T) {
|
||||
// Make a template that renders based on a key in Consul and sends SIGALRM
|
||||
key1 := "foo"
|
||||
content1_1 := "bar"
|
||||
embedded1 := fmt.Sprintf(`{{key "%s"}}`, key1)
|
||||
file1 := "my.tmpl"
|
||||
template := &structs.Template{
|
||||
EmbeddedTmpl: embedded1,
|
||||
DestPath: file1,
|
||||
ChangeMode: structs.TemplateChangeModeSignal,
|
||||
ChangeSignal: "SIGALRM",
|
||||
}
|
||||
|
||||
// Drop the retry rate
|
||||
testRetryRate = 10 * time.Millisecond
|
||||
|
||||
harness := newTestHarness(t, []*structs.Template{template}, true, true, false)
|
||||
defer harness.stop()
|
||||
|
||||
harness.mockHooks.SignalError = fmt.Errorf("test error")
|
||||
|
||||
// Write the key to Consul
|
||||
harness.consul.SetKV(key1, []byte(content1_1))
|
||||
|
||||
// Wait for kill channel
|
||||
select {
|
||||
case <-harness.mockHooks.KillCh:
|
||||
break
|
||||
case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second):
|
||||
t.Fatalf("Should have received a signals: %+v", harness.mockHooks)
|
||||
}
|
||||
|
||||
if !strings.Contains(harness.mockHooks.KillReason, "Sending signals") {
|
||||
t.Fatalf("Unexpected error", harness.mockHooks.KillReason)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ func init() {
|
|||
gob.Register(map[string]interface{}{})
|
||||
gob.Register([]map[string]string{})
|
||||
gob.Register([]map[string]int{})
|
||||
gob.Register(new(os.Signal))
|
||||
gob.Register(syscall.Signal(0x1))
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,9 @@ type MockDriverConfig struct {
|
|||
|
||||
// ExitErrMsg is the error message that the task returns while exiting
|
||||
ExitErrMsg string `mapstructure:"exit_err_msg"`
|
||||
|
||||
// SignalErr is the error message that the task returns if signalled
|
||||
SignalErr string `mapstructure:"signal_error"`
|
||||
}
|
||||
|
||||
// MockDriver is a driver which is used for testing purposes
|
||||
|
@ -88,6 +91,9 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
|||
if driverConfig.ExitErrMsg != "" {
|
||||
h.exitErr = errors.New(driverConfig.ExitErrMsg)
|
||||
}
|
||||
if driverConfig.SignalErr != "" {
|
||||
h.signalErr = fmt.Errorf(driverConfig.SignalErr)
|
||||
}
|
||||
m.logger.Printf("[DEBUG] driver.mock: starting task %q", task.Name)
|
||||
go h.run()
|
||||
return &h, nil
|
||||
|
@ -113,6 +119,7 @@ type mockDriverHandle struct {
|
|||
exitCode int
|
||||
exitSignal int
|
||||
exitErr error
|
||||
signalErr error
|
||||
logger *log.Logger
|
||||
waitCh chan *dstructs.WaitResult
|
||||
doneCh chan struct{}
|
||||
|
@ -126,6 +133,7 @@ type mockDriverID struct {
|
|||
ExitCode int
|
||||
ExitSignal int
|
||||
ExitErr error
|
||||
SignalErr error
|
||||
}
|
||||
|
||||
func (h *mockDriverHandle) ID() string {
|
||||
|
@ -137,6 +145,7 @@ func (h *mockDriverHandle) ID() string {
|
|||
ExitCode: h.exitCode,
|
||||
ExitSignal: h.exitSignal,
|
||||
ExitErr: h.exitErr,
|
||||
SignalErr: h.signalErr,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(id)
|
||||
|
@ -161,6 +170,7 @@ func (m *MockDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro
|
|||
exitCode: id.ExitCode,
|
||||
exitSignal: id.ExitSignal,
|
||||
exitErr: id.ExitErr,
|
||||
signalErr: id.SignalErr,
|
||||
logger: m.logger,
|
||||
doneCh: make(chan struct{}),
|
||||
waitCh: make(chan *dstructs.WaitResult, 1),
|
||||
|
@ -181,7 +191,7 @@ func (h *mockDriverHandle) Update(task *structs.Task) error {
|
|||
|
||||
// TODO Implement when we need it.
|
||||
func (h *mockDriverHandle) Signal(s os.Signal) error {
|
||||
return nil
|
||||
return h.signalErr
|
||||
}
|
||||
|
||||
// Kill kills a mock task
|
||||
|
|
|
@ -115,8 +115,14 @@ type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent)
|
|||
|
||||
// SignalEvent is a tuple of the signal and the event generating it
|
||||
type SignalEvent struct {
|
||||
// s is the signal to be sent
|
||||
s os.Signal
|
||||
|
||||
// e is the task event generating the signal
|
||||
e *structs.TaskEvent
|
||||
|
||||
// result should be used to send back the result of the signal
|
||||
result chan<- error
|
||||
}
|
||||
|
||||
// NewTaskRunner is used to create a new task context
|
||||
|
@ -358,7 +364,7 @@ func (r *TaskRunner) prestart(taskDir string) (success bool) {
|
|||
r.config, r.vaultToken, taskDir, r.taskEnv)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
return
|
||||
}
|
||||
|
@ -512,7 +518,8 @@ func (r *TaskRunner) run() {
|
|||
r.logger.Printf("[DEBUG] client: task being signalled with %v: %s", se.s, se.e.TaskSignalReason)
|
||||
r.setState(structs.TaskStateRunning, se.e)
|
||||
|
||||
// TODO need an interface on the driver
|
||||
res := r.handle.Signal(se.s)
|
||||
se.result <- res
|
||||
|
||||
case event := <-r.restartCh:
|
||||
r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason)
|
||||
|
@ -803,7 +810,7 @@ func (r *TaskRunner) Restart(source, reason string) {
|
|||
}
|
||||
|
||||
// Signal will send a signal to the task
|
||||
func (r *TaskRunner) Signal(source, reason string, s os.Signal) {
|
||||
func (r *TaskRunner) Signal(source, reason string, s os.Signal) error {
|
||||
|
||||
reasonStr := fmt.Sprintf("%s: %s", source, reason)
|
||||
event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetTaskSignalReason(reasonStr)
|
||||
|
@ -817,13 +824,21 @@ func (r *TaskRunner) Signal(source, reason string, s os.Signal) {
|
|||
// Drop the restart event
|
||||
if !running {
|
||||
r.logger.Printf("[DEBUG] client: skipping signal since task isn't running")
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
resCh := make(chan error)
|
||||
se := SignalEvent{
|
||||
s: s,
|
||||
e: event,
|
||||
result: resCh,
|
||||
}
|
||||
select {
|
||||
case r.signalCh <- SignalEvent{s: s, e: event}:
|
||||
case r.signalCh <- se:
|
||||
case <-r.waitCh:
|
||||
}
|
||||
|
||||
return <-resCh
|
||||
}
|
||||
|
||||
// Kill will kill a task and store the error, no longer restarting the task
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -595,3 +596,25 @@ func TestTaskRunner_KillTask(t *testing.T) {
|
|||
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilled)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_SignalFailure(t *testing.T) {
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "10s",
|
||||
"signal_error": "test forcing failure",
|
||||
}
|
||||
|
||||
_, tr := testTaskRunnerFromAlloc(false, alloc)
|
||||
tr.MarkReceived()
|
||||
go tr.Run()
|
||||
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
||||
defer tr.ctx.AllocDir.Destroy()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if err := tr.Signal("test", "test", syscall.SIGINT); err == nil {
|
||||
t.Fatalf("Didn't receive error")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -284,6 +284,12 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
|
|||
} else {
|
||||
desc = "Validation of task failed"
|
||||
}
|
||||
case api.TaskSetupFailure:
|
||||
if event.SetupError != "" {
|
||||
desc = event.SetupError
|
||||
} else {
|
||||
desc = "Task setup failed"
|
||||
}
|
||||
case api.TaskDriverFailure:
|
||||
if event.DriverError != "" {
|
||||
desc = event.DriverError
|
||||
|
@ -299,7 +305,9 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
|
|||
desc = "Failed to download artifacts"
|
||||
}
|
||||
case api.TaskKilling:
|
||||
if event.KillTimeout != 0 {
|
||||
if event.KillReason != "" {
|
||||
desc = fmt.Sprintf("Killing task: %v", event.KillReason)
|
||||
} else if event.KillTimeout != 0 {
|
||||
desc = fmt.Sprintf("Sent interrupt. Waiting %v before force killing", event.KillTimeout)
|
||||
} else {
|
||||
desc = "Sent interrupt"
|
||||
|
|
|
@ -2339,7 +2339,7 @@ func (ts *TaskState) Failed() bool {
|
|||
|
||||
switch ts.Events[l-1].Type {
|
||||
case TaskDiskExceeded, TaskNotRestarting, TaskArtifactDownloadFailed,
|
||||
TaskFailedValidation, TaskVaultRenewalFailed:
|
||||
TaskFailedValidation, TaskVaultRenewalFailed, TaskSetupFailure:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
@ -2362,6 +2362,10 @@ func (ts *TaskState) Successful() bool {
|
|||
}
|
||||
|
||||
const (
|
||||
// TaskSetupFailure indicates that the task could not be started due to a
|
||||
// a setup failure.
|
||||
TaskSetupFailure = "Setup Failure"
|
||||
|
||||
// TaskDriveFailure indicates that the task could not be started due to a
|
||||
// failure in the driver.
|
||||
TaskDriverFailure = "Driver Failure"
|
||||
|
@ -2430,6 +2434,9 @@ type TaskEvent struct {
|
|||
// Restart fields.
|
||||
RestartReason string
|
||||
|
||||
// Setup Failure fields.
|
||||
SetupError string
|
||||
|
||||
// Driver Failure fields.
|
||||
DriverError string // A driver error occurred while starting the task.
|
||||
|
||||
|
@ -2496,6 +2503,13 @@ func NewTaskEvent(event string) *TaskEvent {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetSetupError(err error) *TaskEvent {
|
||||
if err != nil {
|
||||
e.SetupError = err.Error()
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetDriverError(err error) *TaskEvent {
|
||||
if err != nil {
|
||||
e.DriverError = err.Error()
|
||||
|
|
Loading…
Reference in New Issue