Merge pull request #5211 from hashicorp/test-porting-08
Port some 0.8 TaskRunner tests
This commit is contained in:
commit
1fa376cac6
|
@ -35,6 +35,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// defaultMaxEvents is the default max capacity for task events on the
|
||||||
|
// task state. Overrideable for testing.
|
||||||
|
defaultMaxEvents = 10
|
||||||
|
|
||||||
// killBackoffBaseline is the baseline time for exponential backoff while
|
// killBackoffBaseline is the baseline time for exponential backoff while
|
||||||
// killing a task.
|
// killing a task.
|
||||||
killBackoffBaseline = 5 * time.Second
|
killBackoffBaseline = 5 * time.Second
|
||||||
|
@ -191,6 +195,10 @@ type TaskRunner struct {
|
||||||
// be accessed via helpers
|
// be accessed via helpers
|
||||||
runLaunched bool
|
runLaunched bool
|
||||||
runLaunchedLock sync.Mutex
|
runLaunchedLock sync.Mutex
|
||||||
|
|
||||||
|
// maxEvents is the capacity of the TaskEvents on the TaskState.
|
||||||
|
// Defaults to defaultMaxEvents but overrideable for testing.
|
||||||
|
maxEvents int
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
@ -267,6 +275,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
|
||||||
waitCh: make(chan struct{}),
|
waitCh: make(chan struct{}),
|
||||||
devicemanager: config.DeviceManager,
|
devicemanager: config.DeviceManager,
|
||||||
driverManager: config.DriverManager,
|
driverManager: config.DriverManager,
|
||||||
|
maxEvents: defaultMaxEvents,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the logger based on the allocation ID
|
// Create the logger based on the allocation ID
|
||||||
|
@ -1023,7 +1032,7 @@ func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append event to slice
|
// Append event to slice
|
||||||
appendTaskEvent(tr.state, event)
|
appendTaskEvent(tr.state, event, tr.maxEvents)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1189,8 +1198,7 @@ func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// appendTaskEvent updates the task status by appending the new event.
|
// appendTaskEvent updates the task status by appending the new event.
|
||||||
func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
|
func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent, capacity int) {
|
||||||
const capacity = 10
|
|
||||||
if state.Events == nil {
|
if state.Events == nil {
|
||||||
state.Events = make([]*structs.TaskEvent, 1, capacity)
|
state.Events = make([]*structs.TaskEvent, 1, capacity)
|
||||||
state.Events[0] = event
|
state.Events[0] = event
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||||
cstate "github.com/hashicorp/nomad/client/state"
|
cstate "github.com/hashicorp/nomad/client/state"
|
||||||
"github.com/hashicorp/nomad/client/vaultclient"
|
"github.com/hashicorp/nomad/client/vaultclient"
|
||||||
|
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
|
||||||
mockdriver "github.com/hashicorp/nomad/drivers/mock"
|
mockdriver "github.com/hashicorp/nomad/drivers/mock"
|
||||||
"github.com/hashicorp/nomad/helper/testlog"
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
"github.com/hashicorp/nomad/nomad/mock"
|
"github.com/hashicorp/nomad/nomad/mock"
|
||||||
|
@ -126,12 +127,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
|
||||||
defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||||
|
|
||||||
// Wait for it to be running
|
// Wait for it to be running
|
||||||
testutil.WaitForResult(func() (bool, error) {
|
testWaitForTaskToStart(t, origTR)
|
||||||
ts := origTR.TaskState()
|
|
||||||
return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State)
|
|
||||||
}, func(err error) {
|
|
||||||
t.Fatalf("expected running; got: %v", err)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Cause TR to exit without shutting down task
|
// Cause TR to exit without shutting down task
|
||||||
origTR.Shutdown()
|
origTR.Shutdown()
|
||||||
|
@ -597,16 +593,186 @@ func TestTaskRunner_Dispatch_Payload(t *testing.T) {
|
||||||
require.Equal(t, expected, data)
|
require.Equal(t, expected, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// testWaitForTaskToStart waits for the task to or fails the test
|
// TestTaskRunner_SignalFailure asserts that signal errors are properly
|
||||||
func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) {
|
// propagated from the driver to TaskRunner.
|
||||||
// Wait for the task to start
|
func TestTaskRunner_SignalFailure(t *testing.T) {
|
||||||
testutil.WaitForResult(func() (bool, error) {
|
t.Parallel()
|
||||||
tr.stateLock.RLock()
|
|
||||||
started := !tr.state.StartedAt.IsZero()
|
|
||||||
tr.stateLock.RUnlock()
|
|
||||||
|
|
||||||
return started, nil
|
alloc := mock.Alloc()
|
||||||
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||||
|
task.Driver = "mock_driver"
|
||||||
|
errMsg := "test forcing failure"
|
||||||
|
task.Config = map[string]interface{}{
|
||||||
|
"run_for": "10m",
|
||||||
|
"signal_error": errMsg,
|
||||||
|
}
|
||||||
|
|
||||||
|
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
tr, err := NewTaskRunner(conf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
go tr.Run()
|
||||||
|
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||||
|
|
||||||
|
testWaitForTaskToStart(t, tr)
|
||||||
|
|
||||||
|
require.EqualError(t, tr.Signal(&structs.TaskEvent{}, "SIGINT"), errMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTaskRunner_RestartTask asserts that restarting a task works and emits a
|
||||||
|
// Restarting event.
|
||||||
|
func TestTaskRunner_RestartTask(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
alloc := mock.Alloc()
|
||||||
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||||
|
task.Driver = "mock_driver"
|
||||||
|
task.Config = map[string]interface{}{
|
||||||
|
"run_for": "10m",
|
||||||
|
}
|
||||||
|
|
||||||
|
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
tr, err := NewTaskRunner(conf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
go tr.Run()
|
||||||
|
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||||
|
|
||||||
|
testWaitForTaskToStart(t, tr)
|
||||||
|
|
||||||
|
// Restart task. Send a RestartSignal event like check watcher. Restart
|
||||||
|
// handler emits the Restarting event.
|
||||||
|
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason("test")
|
||||||
|
const fail = false
|
||||||
|
tr.Restart(context.Background(), event.Copy(), fail)
|
||||||
|
|
||||||
|
// Wait for it to restart and be running again
|
||||||
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
|
ts := tr.TaskState()
|
||||||
|
if ts.Restarts != 1 {
|
||||||
|
return false, fmt.Errorf("expected 1 restart but found %d\nevents: %s",
|
||||||
|
ts.Restarts, pretty.Sprint(ts.Events))
|
||||||
|
}
|
||||||
|
if ts.State != structs.TaskStateRunning {
|
||||||
|
return false, fmt.Errorf("expected running but received %s", ts.State)
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
}, func(err error) {
|
}, func(err error) {
|
||||||
t.Fatalf("not started")
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Assert the expected Restarting event was emitted
|
||||||
|
found := false
|
||||||
|
events := tr.TaskState().Events
|
||||||
|
for _, e := range events {
|
||||||
|
if e.Type == structs.TaskRestartSignal {
|
||||||
|
found = true
|
||||||
|
require.Equal(t, event.Time, e.Time)
|
||||||
|
require.Equal(t, event.RestartReason, e.RestartReason)
|
||||||
|
require.Contains(t, e.DisplayMessage, event.RestartReason)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.True(t, found, "restarting task event not found", pretty.Sprint(events))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTaskRunner_CheckWatcher_Restart asserts that when enabled an unhealthy
|
||||||
|
// Consul check will cause a task to restart following restart policy rules.
|
||||||
|
func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
alloc := mock.Alloc()
|
||||||
|
|
||||||
|
// Make the restart policy fail within this test
|
||||||
|
tg := alloc.Job.TaskGroups[0]
|
||||||
|
tg.RestartPolicy.Attempts = 2
|
||||||
|
tg.RestartPolicy.Interval = 1 * time.Minute
|
||||||
|
tg.RestartPolicy.Delay = 10 * time.Millisecond
|
||||||
|
tg.RestartPolicy.Mode = structs.RestartPolicyModeFail
|
||||||
|
|
||||||
|
task := tg.Tasks[0]
|
||||||
|
task.Driver = "mock_driver"
|
||||||
|
task.Config = map[string]interface{}{
|
||||||
|
"run_for": "10m",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the task register a check that fails
|
||||||
|
task.Services[0].Checks[0] = &structs.ServiceCheck{
|
||||||
|
Name: "test-restarts",
|
||||||
|
Type: structs.ServiceCheckTCP,
|
||||||
|
Interval: 50 * time.Millisecond,
|
||||||
|
CheckRestart: &structs.CheckRestart{
|
||||||
|
Limit: 2,
|
||||||
|
Grace: 100 * time.Millisecond,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Replace mock Consul ServiceClient, with the real ServiceClient
|
||||||
|
// backed by a mock consul whose checks are always unhealthy.
|
||||||
|
consulAgent := agentconsul.NewMockAgent()
|
||||||
|
consulAgent.SetStatus("critical")
|
||||||
|
consulClient := agentconsul.NewServiceClient(consulAgent, conf.Logger, true)
|
||||||
|
go consulClient.Run()
|
||||||
|
defer consulClient.Shutdown()
|
||||||
|
|
||||||
|
conf.Consul = consulClient
|
||||||
|
|
||||||
|
tr, err := NewTaskRunner(conf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
expectedEvents := []string{
|
||||||
|
"Received",
|
||||||
|
"Task Setup",
|
||||||
|
"Started",
|
||||||
|
"Restart Signaled",
|
||||||
|
"Terminated",
|
||||||
|
"Restarting",
|
||||||
|
"Started",
|
||||||
|
"Restart Signaled",
|
||||||
|
"Terminated",
|
||||||
|
"Restarting",
|
||||||
|
"Started",
|
||||||
|
"Restart Signaled",
|
||||||
|
"Terminated",
|
||||||
|
"Not Restarting",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bump maxEvents so task events aren't dropped
|
||||||
|
tr.maxEvents = 100
|
||||||
|
|
||||||
|
go tr.Run()
|
||||||
|
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||||
|
|
||||||
|
// Wait until the task exits. Don't simply wait for it to run as it may
|
||||||
|
// get restarted and terminated before the test is able to observe it
|
||||||
|
// running.
|
||||||
|
select {
|
||||||
|
case <-tr.WaitCh():
|
||||||
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
||||||
|
require.Fail(t, "timeout")
|
||||||
|
}
|
||||||
|
|
||||||
|
state := tr.TaskState()
|
||||||
|
actualEvents := make([]string, len(state.Events))
|
||||||
|
for i, e := range state.Events {
|
||||||
|
actualEvents[i] = string(e.Type)
|
||||||
|
}
|
||||||
|
require.Equal(t, actualEvents, expectedEvents)
|
||||||
|
|
||||||
|
require.Equal(t, structs.TaskStateDead, state.State)
|
||||||
|
require.True(t, state.Failed, pretty.Sprint(state))
|
||||||
|
}
|
||||||
|
|
||||||
|
// testWaitForTaskToStart waits for the task to be running or fails the test
|
||||||
|
func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) {
|
||||||
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
|
ts := tr.TaskState()
|
||||||
|
return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State)
|
||||||
|
}, func(err error) {
|
||||||
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ var (
|
||||||
"exit_code": hclspec.NewAttr("exit_code", "number", false),
|
"exit_code": hclspec.NewAttr("exit_code", "number", false),
|
||||||
"exit_signal": hclspec.NewAttr("exit_signal", "number", false),
|
"exit_signal": hclspec.NewAttr("exit_signal", "number", false),
|
||||||
"exit_err_msg": hclspec.NewAttr("exit_err_msg", "string", false),
|
"exit_err_msg": hclspec.NewAttr("exit_err_msg", "string", false),
|
||||||
"signal_err": hclspec.NewAttr("signal_err", "string", false),
|
"signal_error": hclspec.NewAttr("signal_error", "string", false),
|
||||||
"driver_ip": hclspec.NewAttr("driver_ip", "string", false),
|
"driver_ip": hclspec.NewAttr("driver_ip", "string", false),
|
||||||
"driver_advertise": hclspec.NewAttr("driver_advertise", "bool", false),
|
"driver_advertise": hclspec.NewAttr("driver_advertise", "bool", false),
|
||||||
"driver_port_map": hclspec.NewAttr("driver_port_map", "string", false),
|
"driver_port_map": hclspec.NewAttr("driver_port_map", "string", false),
|
||||||
|
@ -88,7 +88,7 @@ var (
|
||||||
// capabilities is returned by the Capabilities RPC and indicates what
|
// capabilities is returned by the Capabilities RPC and indicates what
|
||||||
// optional features this driver supports
|
// optional features this driver supports
|
||||||
capabilities = &drivers.Capabilities{
|
capabilities = &drivers.Capabilities{
|
||||||
SendSignals: false,
|
SendSignals: true,
|
||||||
Exec: true,
|
Exec: true,
|
||||||
FSIsolation: drivers.FSIsolationNone,
|
FSIsolation: drivers.FSIsolationNone,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue