960424f086
Task state "dead" is terminal
1247 lines
34 KiB
Go
1247 lines
34 KiB
Go
package client
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"path/filepath"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
"github.com/hashicorp/nomad/client/config"
|
|
"github.com/hashicorp/nomad/client/driver"
|
|
"github.com/hashicorp/nomad/client/vaultclient"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
|
|
ctestutil "github.com/hashicorp/nomad/client/testutil"
|
|
)
|
|
|
|
func testLogger() *log.Logger {
|
|
return prefixedTestLogger("")
|
|
}
|
|
|
|
func prefixedTestLogger(prefix string) *log.Logger {
|
|
return log.New(os.Stderr, prefix, log.LstdFlags)
|
|
}
|
|
|
|
type MockTaskStateUpdater struct {
|
|
state string
|
|
failed bool
|
|
events []*structs.TaskEvent
|
|
}
|
|
|
|
func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent) {
|
|
if state != "" {
|
|
m.state = state
|
|
}
|
|
if event != nil {
|
|
if event.FailsTask {
|
|
m.failed = true
|
|
}
|
|
m.events = append(m.events, event)
|
|
}
|
|
}
|
|
|
|
func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
|
|
return testTaskRunnerFromAlloc(restarts, mock.Alloc())
|
|
}
|
|
|
|
// Creates a mock task runner using the first task in the first task group of
|
|
// the passed allocation.
|
|
func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTaskStateUpdater, *TaskRunner) {
|
|
logger := testLogger()
|
|
conf := config.DefaultConfig()
|
|
conf.StateDir = os.TempDir()
|
|
conf.AllocDir = os.TempDir()
|
|
upd := &MockTaskStateUpdater{}
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
// Initialize the port listing. This should be done by the offer process but
|
|
// we have a mock so that doesn't happen.
|
|
task.Resources.Networks[0].ReservedPorts = []structs.Port{{"", 80}}
|
|
|
|
allocDir := allocdir.NewAllocDir(filepath.Join(conf.AllocDir, alloc.ID))
|
|
allocDir.Build([]*structs.Task{task})
|
|
|
|
vclient := vaultclient.NewMockVaultClient()
|
|
ctx := driver.NewExecContext(allocDir, alloc.ID)
|
|
tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc, task, vclient)
|
|
if !restarts {
|
|
tr.restartTracker = noRestartsTracker()
|
|
}
|
|
return upd, tr
|
|
}
|
|
|
|
func TestTaskRunner_SimpleRun(t *testing.T) {
|
|
ctestutil.ExecCompatible(t)
|
|
upd, tr := testTaskRunner(false)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 3 {
|
|
t.Fatalf("should have 3 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskTerminated {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": 0,
|
|
"start_error": "driver failure",
|
|
"start_error_recoverable": true,
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(true, alloc)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l < 3 {
|
|
return false, fmt.Errorf("Expect at least three events; got %v", l)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskDriverFailure {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDriverFailure)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskRestarting {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[2].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_Destroy(t *testing.T) {
|
|
ctestutil.ExecCompatible(t)
|
|
upd, tr := testTaskRunner(true)
|
|
tr.MarkReceived()
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
// Change command to ensure we run for a bit
|
|
tr.task.Config["command"] = "/bin/sleep"
|
|
tr.task.Config["args"] = []string{"1000"}
|
|
go tr.Run()
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l != 2 {
|
|
return false, fmt.Errorf("Expect two events; got %v", l)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Make sure we are collecting afew stats
|
|
time.Sleep(2 * time.Second)
|
|
stats := tr.LatestResourceUsage()
|
|
if len(stats.Pids) == 0 || stats.ResourceUsage == nil || stats.ResourceUsage.MemoryStats.RSS == 0 {
|
|
t.Fatalf("expected task runner to have some stats")
|
|
}
|
|
|
|
// Begin the tear down
|
|
tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 4 {
|
|
t.Fatalf("should have 4 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskKilling {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if upd.events[3].Type != structs.TaskKilled {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[3].Type, structs.TaskKilled)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Update(t *testing.T) {
|
|
ctestutil.ExecCompatible(t)
|
|
_, tr := testTaskRunner(false)
|
|
|
|
// Change command to ensure we run for a bit
|
|
tr.task.Config["command"] = "/bin/sleep"
|
|
tr.task.Config["args"] = []string{"100"}
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
// Update the task definition
|
|
updateAlloc := tr.alloc.Copy()
|
|
|
|
// Update the restart policy
|
|
newTG := updateAlloc.Job.TaskGroups[0]
|
|
newMode := "foo"
|
|
newTG.RestartPolicy.Mode = newMode
|
|
|
|
newTask := updateAlloc.Job.TaskGroups[0].Tasks[0]
|
|
newTask.Driver = "foobar"
|
|
|
|
// Update the kill timeout
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if tr.handle == nil {
|
|
return false, fmt.Errorf("task not started")
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
oldHandle := tr.handle.ID()
|
|
newTask.KillTimeout = time.Hour
|
|
|
|
tr.Update(updateAlloc)
|
|
|
|
// Wait for update to take place
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if tr.task == newTask {
|
|
return false, fmt.Errorf("We copied the pointer! This would be very bad")
|
|
}
|
|
if tr.task.Driver != newTask.Driver {
|
|
return false, fmt.Errorf("Task not copied")
|
|
}
|
|
if tr.restartTracker.policy.Mode != newMode {
|
|
return false, fmt.Errorf("restart policy not updated")
|
|
}
|
|
if tr.handle.ID() == oldHandle {
|
|
return false, fmt.Errorf("handle not updated")
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "5s",
|
|
}
|
|
|
|
// Give it a Vault token
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
|
|
// Wait for the task to be running and then snapshot the state
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l != 2 {
|
|
return false, fmt.Errorf("Expect two events; got %v", l)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
if err := tr.SaveState(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Read the token from the file system
|
|
secretDir, err := tr.ctx.AllocDir.GetSecretDir(task.Name)
|
|
if err != nil {
|
|
t.Fatalf("failed to determine task %s secret dir: %v", err)
|
|
}
|
|
|
|
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
|
data, err := ioutil.ReadFile(tokenPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file: %v", err)
|
|
}
|
|
token := string(data)
|
|
if len(token) == 0 {
|
|
t.Fatalf("Token not written to disk")
|
|
}
|
|
|
|
// Create a new task runner
|
|
tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update,
|
|
tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.vaultClient)
|
|
tr2.restartTracker = noRestartsTracker()
|
|
if err := tr2.RestoreState(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
go tr2.Run()
|
|
defer tr2.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
|
|
// Destroy and wait
|
|
select {
|
|
case <-tr2.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
// Check that we recovered the token
|
|
if act := tr2.vaultFuture.Get(); act != token {
|
|
t.Fatalf("Vault token not properly recovered")
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Download_List(t *testing.T) {
|
|
ctestutil.ExecCompatible(t)
|
|
|
|
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("."))))
|
|
defer ts.Close()
|
|
|
|
// Create an allocation that has a task with a list of artifacts.
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
f1 := "task_runner_test.go"
|
|
f2 := "task_runner.go"
|
|
artifact1 := structs.TaskArtifact{
|
|
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
|
|
}
|
|
artifact2 := structs.TaskArtifact{
|
|
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f2),
|
|
}
|
|
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 4 {
|
|
t.Fatalf("should have 4 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[3].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that both files exist.
|
|
taskDir := tr.ctx.AllocDir.TaskDirs[task.Name]
|
|
if _, err := os.Stat(filepath.Join(taskDir, f1)); err != nil {
|
|
t.Fatalf("%v not downloaded", f1)
|
|
}
|
|
if _, err := os.Stat(filepath.Join(taskDir, f2)); err != nil {
|
|
t.Fatalf("%v not downloaded", f2)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Download_Retries(t *testing.T) {
|
|
ctestutil.ExecCompatible(t)
|
|
|
|
// Create an allocation that has a task with bad artifacts.
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
artifact := structs.TaskArtifact{
|
|
GetterSource: "http://127.1.1.111:12315/foo/bar/baz",
|
|
}
|
|
task.Artifacts = []*structs.TaskArtifact{&artifact}
|
|
|
|
// Make the restart policy try one update
|
|
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
|
Attempts: 1,
|
|
Interval: 10 * time.Minute,
|
|
Delay: 1 * time.Second,
|
|
Mode: structs.RestartPolicyModeFail,
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(true, alloc)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 7 {
|
|
t.Fatalf("should have 7 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskArtifactDownloadFailed {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskArtifactDownloadFailed)
|
|
}
|
|
|
|
if upd.events[3].Type != structs.TaskRestarting {
|
|
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
if upd.events[4].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if upd.events[5].Type != structs.TaskArtifactDownloadFailed {
|
|
t.Fatalf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskArtifactDownloadFailed)
|
|
}
|
|
|
|
if upd.events[6].Type != structs.TaskNotRestarting {
|
|
t.Fatalf("Seventh Event was %v; want %v", upd.events[6].Type, structs.TaskNotRestarting)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Validate_UserEnforcement(t *testing.T) {
|
|
_, tr := testTaskRunner(false)
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
if err := tr.setTaskEnv(); err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Try to run as root with exec.
|
|
tr.task.Driver = "exec"
|
|
tr.task.User = "root"
|
|
if err := tr.validateTask(); err == nil {
|
|
t.Fatalf("expected error running as root with exec")
|
|
}
|
|
|
|
// Try to run a non-blacklisted user with exec.
|
|
tr.task.Driver = "exec"
|
|
tr.task.User = "foobar"
|
|
if err := tr.validateTask(); err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
// Try to run as root with docker.
|
|
tr.task.Driver = "docker"
|
|
tr.task.User = "root"
|
|
if err := tr.validateTask(); err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_RestartTask(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(true, alloc)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
go func() {
|
|
time.Sleep(time.Duration(testutil.TestMultiplier()*300) * time.Millisecond)
|
|
tr.Restart("test", "restart")
|
|
time.Sleep(time.Duration(testutil.TestMultiplier()*300) * time.Millisecond)
|
|
tr.Kill("test", "restart", false)
|
|
}()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 9 {
|
|
t.Fatalf("should have 9 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskRestartSignal {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskRestartSignal)
|
|
}
|
|
|
|
if upd.events[3].Type != structs.TaskKilling {
|
|
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if upd.events[4].Type != structs.TaskKilled {
|
|
t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled)
|
|
}
|
|
|
|
t.Logf("%+v", upd.events[5])
|
|
if upd.events[5].Type != structs.TaskRestarting {
|
|
t.Fatalf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
if upd.events[6].Type != structs.TaskStarted {
|
|
t.Fatalf("Seventh Event was %v; want %v", upd.events[7].Type, structs.TaskStarted)
|
|
}
|
|
if upd.events[7].Type != structs.TaskKilling {
|
|
t.Fatalf("Eighth Event was %v; want %v", upd.events[7].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if upd.events[8].Type != structs.TaskKilled {
|
|
t.Fatalf("Nineth Event was %v; want %v", upd.events[8].Type, structs.TaskKilled)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_KillTask(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
tr.Kill("test", "kill", true)
|
|
}()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 4 {
|
|
t.Fatalf("should have 4 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if !upd.failed {
|
|
t.Fatalf("TaskState should be failed: %+v", upd)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskKilling {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if upd.events[3].Type != structs.TaskKilled {
|
|
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilled)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_SignalFailure(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
"signal_error": "test forcing failure",
|
|
}
|
|
|
|
_, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
go tr.Run()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
if err := tr.Signal("test", "test", syscall.SIGINT); err == nil {
|
|
t.Fatalf("Didn't receive error")
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_BlockForVault(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
// Control when we get a Vault token
|
|
token := "1234"
|
|
waitCh := make(chan struct{})
|
|
handler := func(*structs.Allocation, []string) (map[string]string, error) {
|
|
<-waitCh
|
|
return map[string]string{task.Name: token}, nil
|
|
}
|
|
tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
|
|
|
|
go tr.Run()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
t.Fatalf("premature exit")
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
if len(upd.events) != 1 {
|
|
t.Fatalf("should have 1 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStatePending {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStatePending)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
// Unblock
|
|
close(waitCh)
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 3 {
|
|
t.Fatalf("should have 3 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskTerminated {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that the token is on disk
|
|
secretDir, err := tr.ctx.AllocDir.GetSecretDir(task.Name)
|
|
if err != nil {
|
|
t.Fatalf("failed to determine task %s secret dir: %v", err)
|
|
}
|
|
|
|
// Read the token from the file system
|
|
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
|
data, err := ioutil.ReadFile(tokenPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file: %v", err)
|
|
}
|
|
|
|
if act := string(data); act != token {
|
|
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_DeriveToken_Retry(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
// Control when we get a Vault token
|
|
token := "1234"
|
|
count := 0
|
|
handler := func(*structs.Allocation, []string) (map[string]string, error) {
|
|
if count > 0 {
|
|
return map[string]string{task.Name: token}, nil
|
|
}
|
|
|
|
count++
|
|
return nil, structs.NewRecoverableError(fmt.Errorf("Want a retry"), true)
|
|
}
|
|
tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
|
|
go tr.Run()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 3 {
|
|
t.Fatalf("should have 3 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskTerminated {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that the token is on disk
|
|
secretDir, err := tr.ctx.AllocDir.GetSecretDir(task.Name)
|
|
if err != nil {
|
|
t.Fatalf("failed to determine task %s secret dir: %v", err)
|
|
}
|
|
|
|
// Read the token from the file system
|
|
tokenPath := filepath.Join(secretDir, vaultTokenFile)
|
|
data, err := ioutil.ReadFile(tokenPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file: %v", err)
|
|
}
|
|
|
|
if act := string(data); act != token {
|
|
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_DeriveToken_Unrecoverable(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
task.Vault = &structs.Vault{
|
|
Policies: []string{"default"},
|
|
ChangeMode: structs.VaultChangeModeRestart,
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
// Error the token derivation
|
|
vc := tr.vaultClient.(*vaultclient.MockVaultClient)
|
|
vc.SetDeriveTokenError(alloc.ID, []string{task.Name}, fmt.Errorf("Non recoverable"))
|
|
go tr.Run()
|
|
|
|
// Wait for the task to start
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l != 2 {
|
|
return false, fmt.Errorf("Expect two events; got %v", l)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskKilling {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskKilling)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_Template_Block(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Templates = []*structs.Template{
|
|
{
|
|
EmbeddedTmpl: "{{key \"foo\"}}",
|
|
DestPath: "local/test",
|
|
ChangeMode: structs.TemplateChangeModeNoop,
|
|
},
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
go tr.Run()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
t.Fatalf("premature exit")
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
if len(upd.events) != 1 {
|
|
t.Fatalf("should have 1 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStatePending {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStatePending)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
// Unblock
|
|
tr.UnblockStart("test")
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 3 {
|
|
t.Fatalf("should have 3 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskTerminated {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskTerminated)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Template_Artifact(t *testing.T) {
|
|
dir, err := os.Getwd()
|
|
if err != nil {
|
|
t.Fatal("bad: %v", err)
|
|
}
|
|
|
|
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Join(dir, ".."))))
|
|
defer ts.Close()
|
|
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
// Create an allocation that has a task that renders a template from an
|
|
// artifact
|
|
f1 := "CHANGELOG.md"
|
|
artifact := structs.TaskArtifact{
|
|
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
|
|
}
|
|
task.Artifacts = []*structs.TaskArtifact{&artifact}
|
|
task.Templates = []*structs.Template{
|
|
{
|
|
SourcePath: "CHANGELOG.md",
|
|
DestPath: "local/test",
|
|
ChangeMode: structs.TemplateChangeModeNoop,
|
|
},
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
|
|
go tr.Run()
|
|
|
|
select {
|
|
case <-tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(upd.events) != 4 {
|
|
t.Fatalf("should have 4 updates: %#v", upd.events)
|
|
}
|
|
|
|
if upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[3].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that both files exist.
|
|
taskDir := tr.ctx.AllocDir.TaskDirs[task.Name]
|
|
if _, err := os.Stat(filepath.Join(taskDir, f1)); err != nil {
|
|
t.Fatalf("%v not downloaded", f1)
|
|
}
|
|
if _, err := os.Stat(filepath.Join(taskDir, allocdir.TaskLocal, "test")); err != nil {
|
|
t.Fatalf("template not rendered")
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Template_NewVaultToken(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Templates = []*structs.Template{
|
|
{
|
|
EmbeddedTmpl: "{{key \"foo\"}}",
|
|
DestPath: "local/test",
|
|
ChangeMode: structs.TemplateChangeModeNoop,
|
|
},
|
|
}
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
_, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
go tr.Run()
|
|
|
|
// Wait for a Vault token
|
|
var token string
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if token = tr.vaultFuture.Get(); token == "" {
|
|
return false, fmt.Errorf("No Vault token")
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Error the token renewal
|
|
vc := tr.vaultClient.(*vaultclient.MockVaultClient)
|
|
renewalCh, ok := vc.RenewTokens[token]
|
|
if !ok {
|
|
t.Fatalf("no renewal channel")
|
|
}
|
|
|
|
originalManager := tr.templateManager
|
|
|
|
renewalCh <- fmt.Errorf("Test killing")
|
|
close(renewalCh)
|
|
|
|
// Wait for a new Vault token
|
|
var token2 string
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if token2 = tr.vaultFuture.Get(); token2 == "" || token2 == token {
|
|
return false, fmt.Errorf("No new Vault token")
|
|
}
|
|
|
|
if originalManager == tr.templateManager {
|
|
return false, fmt.Errorf("Template manager not updated")
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_VaultManager_Restart(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
task.Vault = &structs.Vault{
|
|
Policies: []string{"default"},
|
|
ChangeMode: structs.VaultChangeModeRestart,
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
go tr.Run()
|
|
|
|
// Wait for the task to start
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l != 2 {
|
|
return false, fmt.Errorf("Expect two events; got %v", l)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Error the token renewal
|
|
vc := tr.vaultClient.(*vaultclient.MockVaultClient)
|
|
renewalCh, ok := vc.RenewTokens[tr.vaultFuture.Get()]
|
|
if !ok {
|
|
t.Fatalf("no renewal channel")
|
|
}
|
|
|
|
renewalCh <- fmt.Errorf("Test killing")
|
|
close(renewalCh)
|
|
|
|
// Ensure a restart
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l != 7 {
|
|
return false, fmt.Errorf("Expect seven events; got %#v", upd.events)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskRestartSignal {
|
|
return false, fmt.Errorf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskRestartSignal)
|
|
}
|
|
|
|
if upd.events[3].Type != structs.TaskKilling {
|
|
return false, fmt.Errorf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if upd.events[4].Type != structs.TaskKilled {
|
|
return false, fmt.Errorf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled)
|
|
}
|
|
|
|
if upd.events[5].Type != structs.TaskRestarting {
|
|
return false, fmt.Errorf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
if upd.events[6].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Seventh Event was %v; want %v", upd.events[6].Type, structs.TaskStarted)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_VaultManager_Signal(t *testing.T) {
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
task.Vault = &structs.Vault{
|
|
Policies: []string{"default"},
|
|
ChangeMode: structs.VaultChangeModeSignal,
|
|
ChangeSignal: "SIGUSR1",
|
|
}
|
|
|
|
upd, tr := testTaskRunnerFromAlloc(false, alloc)
|
|
tr.MarkReceived()
|
|
defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer tr.ctx.AllocDir.Destroy()
|
|
go tr.Run()
|
|
|
|
// Wait for the task to start
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l != 2 {
|
|
return false, fmt.Errorf("Expect two events; got %v", l)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Error the token renewal
|
|
vc := tr.vaultClient.(*vaultclient.MockVaultClient)
|
|
renewalCh, ok := vc.RenewTokens[tr.vaultFuture.Get()]
|
|
if !ok {
|
|
t.Fatalf("no renewal channel")
|
|
}
|
|
|
|
renewalCh <- fmt.Errorf("Test killing")
|
|
close(renewalCh)
|
|
|
|
// Ensure a restart
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(upd.events); l != 3 {
|
|
return false, fmt.Errorf("Expect three events; got %#v", upd.events)
|
|
}
|
|
|
|
if upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if upd.events[1].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if upd.events[2].Type != structs.TaskSignaling {
|
|
return false, fmt.Errorf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskSignaling)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|