open-nomad/client/allocrunner/taskrunner/task_runner_test.go

2031 lines
57 KiB
Go

package taskrunner
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strings"
"syscall"
"testing"
"time"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
)
// Returns a tracker that never restarts.
func noRestartsTracker() *restarts.RestartTracker {
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
return restarts.NewRestartTracker(policy, structs.JobTypeBatch)
}
type MockTaskStateUpdater struct {
state string
failed bool
events []*structs.TaskEvent
}
func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent, _ bool) {
if state != "" {
m.state = state
}
if event != nil {
if event.FailsTask {
m.failed = true
}
m.events = append(m.events, event)
}
}
// String for debugging purposes.
func (m *MockTaskStateUpdater) String() string {
s := fmt.Sprintf("Updates:\n state=%q\n failed=%t\n events=\n", m.state, m.failed)
for _, e := range m.events {
s += fmt.Sprintf(" %#v\n", e)
}
return s
}
type taskRunnerTestCtx struct {
upd *MockTaskStateUpdater
tr *TaskRunner
allocDir *allocdir.AllocDir
vault *vaultclient.MockVaultClient
consul *consul.MockAgent
consulClient *consul.ServiceClient
}
// Cleanup calls Destroy on the task runner and alloc dir
func (ctx *taskRunnerTestCtx) Cleanup() {
ctx.consulClient.Shutdown()
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
ctx.allocDir.Destroy()
}
func testTaskRunner(t *testing.T, restarts bool) *taskRunnerTestCtx {
// Use mock driver
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "500ms"
return testTaskRunnerFromAlloc(t, restarts, alloc)
}
// Creates a mock task runner using the first task in the first task group of
// the passed allocation.
//
// Callers should defer Cleanup() to cleanup after completion
func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocation) *taskRunnerTestCtx {
logger := testlog.Logger(t)
conf := config.DefaultConfig()
conf.Node = mock.Node()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, err := ioutil.TempFile("", "state-db")
if err != nil {
t.Fatalf("error creating state db file: %v", err)
}
db, err := bolt.Open(tmp.Name(), 0600, nil)
if err != nil {
t.Fatalf("error creating state db: %v", err)
}
upd := &MockTaskStateUpdater{}
task := alloc.Job.TaskGroups[0].Tasks[0]
allocDir := allocdir.NewAllocDir(testlog.Logger(t), filepath.Join(conf.AllocDir, alloc.ID))
if err := allocDir.Build(); err != nil {
t.Fatalf("error building alloc dir: %v", err)
return nil
}
//HACK to get FSIsolation and chroot without using AllocRunner,
// TaskRunner, or Drivers
fsi := cstructs.FSIsolationImage
switch task.Driver {
case "raw_exec":
fsi = cstructs.FSIsolationNone
case "exec", "java":
fsi = cstructs.FSIsolationChroot
}
taskDir := allocDir.NewTaskDir(task.Name)
if err := taskDir.Build(false, config.DefaultChrootEnv, fsi); err != nil {
t.Fatalf("error building task dir %q: %v", task.Name, err)
return nil
}
vclient := vaultclient.NewMockVaultClient()
cclient := consul.NewMockAgent()
serviceClient := consul.NewServiceClient(cclient, logger, true)
go serviceClient.Run()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
if !restarts {
tr.restartTracker = noRestartsTracker()
}
return &taskRunnerTestCtx{
upd: upd,
tr: tr,
allocDir: allocDir,
vault: vclient,
consul: cclient,
consulClient: serviceClient,
}
}
// testWaitForTaskToStart waits for the task to or fails the test
func testWaitForTaskToStart(t *testing.T, ctx *taskRunnerTestCtx) {
// Wait for the task to start
testutil.WaitForResult(func() (bool, error) {
l := len(ctx.upd.events)
if l < 2 {
return false, fmt.Errorf("Expect two events; got %v", l)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if l >= 3 {
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
} else {
if ctx.upd.events[1].Type != structs.TaskStarted {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskStarted)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_SimpleRun(t *testing.T) {
t.Parallel()
ctx := testTaskRunner(t, false)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 3 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
event := ctx.upd.events[0]
if event.Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
event = ctx.upd.events[1]
if event.Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
displayMsg := event.DisplayMessage
if displayMsg != "Building Task Directory" {
t.Fatalf("Bad display message:%v", displayMsg)
}
event = ctx.upd.events[2]
if event.Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
displayMsg = event.DisplayMessage
if displayMsg != "Task started by client" {
t.Fatalf("Bad display message:%v", displayMsg)
}
event = ctx.upd.events[3]
if event.Type != structs.TaskTerminated {
t.Fatalf("Third Event was %v; want %v", event.Type, structs.TaskTerminated)
}
displayMsg = event.DisplayMessage
if displayMsg != "Exit Code: 0" {
t.Fatalf("Bad display message:%v", displayMsg)
}
if event.Details["exit_code"] != "0" {
t.Fatalf("Bad details map :%v", event.Details)
}
}
func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": 0,
"start_error": "driver failure",
"start_error_recoverable": true,
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l < 4 {
return false, fmt.Errorf("Expect at least four events; got %v", l)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDriverFailure {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDriverFailure)
}
if ctx.upd.events[3].Type != structs.TaskRestarting {
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestarting)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_Destroy(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
// Begin the tear down
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[3].Type != structs.TaskKilling {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskKilling)
}
if ctx.upd.events[4].Type != structs.TaskKilled {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilled)
}
}
func TestTaskRunner_Update(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Checks[0] = &structs.ServiceCheck{
Name: "http-check",
Type: "http",
PortLabel: "http",
Path: "${NOMAD_META_foo}",
}
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "100s",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
testWaitForTaskToStart(t, ctx)
// Update the task definition
updateAlloc := ctx.tr.alloc.Copy()
// Update the restart policy
newTG := updateAlloc.Job.TaskGroups[0]
newMode := "foo"
newTG.RestartPolicy.Mode = newMode
newTask := newTG.Tasks[0]
newTask.Driver = "mock_driver"
// Update meta to make sure service checks are interpolated correctly
// #2180
newTask.Meta["foo"] = "/UPDATE"
// Update the kill timeout
oldHandle := ctx.tr.handle.ID()
newTask.KillTimeout = time.Hour
ctx.tr.Update(updateAlloc)
// Wait for ctx.update to take place
testutil.WaitForResult(func() (bool, error) {
if ctx.tr.task == newTask {
return false, fmt.Errorf("We copied the pointer! This would be very bad")
}
if ctx.tr.task.Driver != newTask.Driver {
return false, fmt.Errorf("Task not copied")
}
if ctx.tr.restartTracker.GetPolicy().Mode != newMode {
return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.GetPolicy().Mode)
}
if ctx.tr.handle.ID() == oldHandle {
return false, fmt.Errorf("handle not ctx.updated")
}
// Make sure Consul services were interpolated correctly during
// the update #2180
checks := ctx.consul.CheckRegs()
if n := len(checks); n != 1 {
return false, fmt.Errorf("expected 1 check but found %d", n)
}
for _, check := range checks {
if found := check.HTTP; !strings.HasSuffix(found, "/UPDATE") {
return false, fmt.Errorf("expected consul check path to end with /UPDATE but found: %q", found)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_SaveRestoreState(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "5s",
}
// Give it a Vault token
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to be running and then snapshot the state
testWaitForTaskToStart(t, ctx)
if err := ctx.tr.SaveState(); err != nil {
t.Fatalf("err: %v", err)
}
// Read the token from the file system
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
data, err := ioutil.ReadFile(tokenPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
token := string(data)
if len(token) == 0 {
t.Fatalf("Token not written to disk")
}
// Create a new task runner
task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver, Vault: ctx.tr.task.Vault}
tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.tr.stateDB, ctx.upd.Update,
ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient, ctx.tr.consul)
tr2.restartTracker = noRestartsTracker()
if _, err := tr2.RestoreState(); err != nil {
t.Fatalf("err: %v", err)
}
go tr2.Run()
defer tr2.Destroy(structs.NewTaskEvent(structs.TaskKilled))
// Destroy and wait
select {
case <-tr2.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
// Check that we recovered the token
if act := tr2.vaultFuture.Get(); act != token {
t.Fatalf("Vault token not properly recovered")
}
}
func TestTaskRunner_Download_List(t *testing.T) {
t.Parallel()
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("."))))
defer ts.Close()
// Create an allocation that has a task with a list of artifacts.
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
f1 := "task_runner_test.go"
f2 := "task_runner.go"
artifact1 := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
}
artifact2 := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f2),
}
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[3].Type != structs.TaskStarted {
t.Fatalf("Forth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskStarted)
}
if ctx.upd.events[4].Type != structs.TaskTerminated {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskTerminated)
}
// Check that both files exist.
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f1)); err != nil {
t.Fatalf("%v not downloaded", f1)
}
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f2)); err != nil {
t.Fatalf("%v not downloaded", f2)
}
}
func TestTaskRunner_Download_Retries(t *testing.T) {
t.Parallel()
// Create an allocation that has a task with bad artifacts.
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
artifact := structs.TaskArtifact{
GetterSource: "http://127.0.0.1:0/foo/bar/baz",
}
task.Artifacts = []*structs.TaskArtifact{&artifact}
// Make the restart policy try one ctx.update
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
Attempts: 1,
Interval: 10 * time.Minute,
Delay: 1 * time.Second,
Mode: structs.RestartPolicyModeFail,
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 8 {
t.Fatalf("should have 8 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[3].Type != structs.TaskArtifactDownloadFailed {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskArtifactDownloadFailed)
}
if ctx.upd.events[4].Type != structs.TaskRestarting {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskRestarting)
}
if ctx.upd.events[5].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[6].Type != structs.TaskArtifactDownloadFailed {
t.Fatalf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskArtifactDownloadFailed)
}
if ctx.upd.events[7].Type != structs.TaskNotRestarting {
t.Fatalf("Eighth Event was %v; want %v", ctx.upd.events[7].Type, structs.TaskNotRestarting)
}
}
// TestTaskRunner_UnregisterConsul_Retries asserts a task is unregistered from
// Consul when waiting to be retried.
func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
t.Parallel()
// Create an allocation that has a task with bad artifacts.
alloc := mock.Alloc()
// Make the restart policy try one ctx.update
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
Attempts: 1,
Interval: 10 * time.Minute,
Delay: time.Nanosecond,
Mode: structs.RestartPolicyModeFail,
}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "1",
"run_for": "1ns",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
// Use mockConsulServiceClient
consul := consulApi.NewMockConsulServiceClient(t)
ctx.tr.consul = consul
ctx.tr.MarkReceived()
ctx.tr.Run()
defer ctx.Cleanup()
// Assert it is properly registered and unregistered
if expected := 6; len(consul.Ops) != expected {
t.Errorf("expected %d consul ops but found: %d", expected, len(consul.Ops))
}
if consul.Ops[0].Op != "add" {
t.Errorf("expected first Op to be add but found: %q", consul.Ops[0].Op)
}
if consul.Ops[1].Op != "remove" {
t.Errorf("expected second op to be remove but found: %q", consul.Ops[1].Op)
}
if consul.Ops[2].Op != "remove" {
t.Errorf("expected third op to be remove but found: %q", consul.Ops[2].Op)
}
if consul.Ops[3].Op != "add" {
t.Errorf("expected fourth op to be add but found: %q", consul.Ops[3].Op)
}
if consul.Ops[4].Op != "remove" {
t.Errorf("expected fifth op to be remove but found: %q", consul.Ops[4].Op)
}
if consul.Ops[5].Op != "remove" {
t.Errorf("expected sixth op to be remove but found: %q", consul.Ops[5].Op)
}
}
func TestTaskRunner_Validate_UserEnforcement(t *testing.T) {
t.Parallel()
ctx := testTaskRunner(t, false)
defer ctx.Cleanup()
// Try to run as root with exec.
ctx.tr.task.Driver = "exec"
ctx.tr.task.User = "root"
if err := ctx.tr.validateTask(); err == nil {
t.Fatalf("expected error running as root with exec")
}
// Try to run a non-blacklisted user with exec.
ctx.tr.task.Driver = "exec"
ctx.tr.task.User = "foobar"
if err := ctx.tr.validateTask(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Try to run as root with docker.
ctx.tr.task.Driver = "docker"
ctx.tr.task.User = "root"
if err := ctx.tr.validateTask(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestTaskRunner_RestartTask(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "100s",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for it to start
go func() {
testWaitForTaskToStart(t, ctx)
ctx.tr.Restart("test", "restart", false)
// Wait for it to restart then kill
go func() {
// Wait for the task to start again
testutil.WaitForResult(func() (bool, error) {
if len(ctx.upd.events) != 8 {
return false, fmt.Errorf("task %q in alloc %q should have 8 ctx.updates: %#v", task.Name, alloc.ID, ctx.upd.events)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
ctx.tr.Kill("test", "restart", false)
}()
}()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 10 {
t.Fatalf("should have 10 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskRestartSignal {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestartSignal)
}
if ctx.upd.events[4].Type != structs.TaskKilling {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilling)
}
if ctx.upd.events[5].Type != structs.TaskKilled {
t.Fatalf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskKilled)
}
if ctx.upd.events[6].Type != structs.TaskRestarting {
t.Fatalf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskRestarting)
}
if ctx.upd.events[7].Type != structs.TaskStarted {
t.Fatalf("Eighth Event was %v; want %v", ctx.upd.events[8].Type, structs.TaskStarted)
}
if ctx.upd.events[8].Type != structs.TaskKilling {
t.Fatalf("Ninth Event was %v; want %v", ctx.upd.events[8].Type, structs.TaskKilling)
}
if ctx.upd.events[9].Type != structs.TaskKilled {
t.Fatalf("Tenth Event was %v; want %v", ctx.upd.events[9].Type, structs.TaskKilled)
}
}
func TestTaskRunner_KillTask(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
go func() {
testWaitForTaskToStart(t, ctx)
ctx.tr.Kill("test", "kill", true)
}()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if !ctx.upd.failed {
t.Fatalf("TaskState should be failed: %+v", ctx.upd)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskKilling {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskKilling)
}
if ctx.upd.events[4].Type != structs.TaskKilled {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilled)
}
}
func TestTaskRunner_SignalFailure(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
"signal_error": "test forcing failure",
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
if err := ctx.tr.Signal("test", "test", syscall.SIGINT); err == nil {
t.Fatalf("Didn't receive error")
}
}
func TestTaskRunner_BlockForVault(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
// Control when we get a Vault token
token := "1234"
waitCh := make(chan struct{})
handler := func(*structs.Allocation, []string) (map[string]string, error) {
<-waitCh
return map[string]string{task.Name: token}, nil
}
ctx.tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
t.Fatalf("premature exit")
case <-time.After(1 * time.Second):
}
if len(ctx.upd.events) != 2 {
t.Fatalf("should have 2 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStatePending {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStatePending)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
// Unblock
close(waitCh)
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
// Check that the token is on disk
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
data, err := ioutil.ReadFile(tokenPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
if act := string(data); act != token {
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
}
// Check the token was revoked
m := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
testutil.WaitForResult(func() (bool, error) {
if len(m.StoppedTokens) != 1 {
return false, fmt.Errorf("Expected a stopped token: %v", m.StoppedTokens)
}
if a := m.StoppedTokens[0]; a != token {
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_DeriveToken_Retry(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
// Control when we get a Vault token
token := "1234"
count := 0
handler := func(*structs.Allocation, []string) (map[string]string, error) {
if count > 0 {
return map[string]string{task.Name: token}, nil
}
count++
return nil, structs.NewRecoverableError(fmt.Errorf("Want a retry"), true)
}
ctx.tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
// Check that the token is on disk
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
data, err := ioutil.ReadFile(tokenPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
if act := string(data); act != token {
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
}
// Check the token was revoked
m := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
testutil.WaitForResult(func() (bool, error) {
if len(m.StoppedTokens) != 1 {
return false, fmt.Errorf("Expected a stopped token: %v", m.StoppedTokens)
}
if a := m.StoppedTokens[0]; a != token {
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_DeriveToken_Unrecoverable(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
task.Vault = &structs.Vault{
Policies: []string{"default"},
ChangeMode: structs.VaultChangeModeRestart,
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
// Error the token derivation
vc := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
vc.SetDeriveTokenError(alloc.ID, []string{task.Name}, fmt.Errorf("Non recoverable"))
go ctx.tr.Run()
// Wait for the task to start
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l != 3 {
return false, fmt.Errorf("Expect 3 events; got %v", l)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskKilling {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskKilling)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_Template_Block(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Templates = []*structs.Template{
{
EmbeddedTmpl: "{{key \"foo\"}}",
DestPath: "local/test",
ChangeMode: structs.TemplateChangeModeNoop,
},
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
t.Fatalf("premature exit")
case <-time.After(1 * time.Second):
}
if len(ctx.upd.events) != 2 {
t.Fatalf("should have 2 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStatePending {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStatePending)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
// Unblock
ctx.tr.UnblockStart("test")
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
}
func TestTaskRunner_Template_Artifact(t *testing.T) {
t.Parallel()
dir, err := os.Getwd()
if err != nil {
t.Fatalf("bad: %v", err)
}
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Join(dir, "../../.."))))
defer ts.Close()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
// Create an allocation that has a task that renders a template from an
// artifact
f1 := "CHANGELOG.md"
artifact := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
}
task.Artifacts = []*structs.TaskArtifact{&artifact}
task.Templates = []*structs.Template{
{
SourcePath: "CHANGELOG.md",
DestPath: "local/test",
ChangeMode: structs.TemplateChangeModeNoop,
},
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[3].Type != structs.TaskStarted {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskStarted)
}
if ctx.upd.events[4].Type != structs.TaskTerminated {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskTerminated)
}
// Check that both files exist.
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f1)); err != nil {
t.Fatalf("%v not downloaded", f1)
}
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.LocalDir, "test")); err != nil {
t.Fatalf("template not rendered")
}
}
func TestTaskRunner_Template_NewVaultToken(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Templates = []*structs.Template{
{
EmbeddedTmpl: "{{key \"foo\"}}",
DestPath: "local/test",
ChangeMode: structs.TemplateChangeModeNoop,
},
}
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
go ctx.tr.Run()
// Wait for a Vault token
var token string
testutil.WaitForResult(func() (bool, error) {
if token = ctx.tr.vaultFuture.Get(); token == "" {
return false, fmt.Errorf("No Vault token")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Error the token renewal
renewalCh, ok := ctx.vault.RenewTokens[token]
if !ok {
t.Fatalf("no renewal channel")
}
originalManager := ctx.tr.templateManager
renewalCh <- fmt.Errorf("Test killing")
close(renewalCh)
// Wait for a new Vault token
var token2 string
testutil.WaitForResult(func() (bool, error) {
if token2 = ctx.tr.vaultFuture.Get(); token2 == "" || token2 == token {
return false, fmt.Errorf("No new Vault token")
}
if originalManager == ctx.tr.templateManager {
return false, fmt.Errorf("Template manager not ctx.updated")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check the token was revoked
testutil.WaitForResult(func() (bool, error) {
if len(ctx.vault.StoppedTokens) != 1 {
return false, fmt.Errorf("Expected a stopped token: %v", ctx.vault.StoppedTokens)
}
if a := ctx.vault.StoppedTokens[0]; a != token {
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_VaultManager_Restart(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
task.Vault = &structs.Vault{
Policies: []string{"default"},
ChangeMode: structs.VaultChangeModeRestart,
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
go ctx.tr.Run()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
// Error the token renewal
renewalCh, ok := ctx.vault.RenewTokens[ctx.tr.vaultFuture.Get()]
if !ok {
t.Fatalf("no renewal channel")
}
renewalCh <- fmt.Errorf("Test killing")
close(renewalCh)
// Ensure a restart
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l != 8 {
return false, fmt.Errorf("Expect eight events; got %#v", ctx.upd.events)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskStarted)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskRestartSignal {
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestartSignal)
}
if ctx.upd.events[4].Type != structs.TaskKilling {
return false, fmt.Errorf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilling)
}
if ctx.upd.events[5].Type != structs.TaskKilled {
return false, fmt.Errorf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskKilled)
}
if ctx.upd.events[6].Type != structs.TaskRestarting {
return false, fmt.Errorf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskRestarting)
}
if ctx.upd.events[7].Type != structs.TaskStarted {
return false, fmt.Errorf("Eight Event was %v; want %v", ctx.upd.events[7].Type, structs.TaskStarted)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_VaultManager_Signal(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
task.Vault = &structs.Vault{
Policies: []string{"default"},
ChangeMode: structs.VaultChangeModeSignal,
ChangeSignal: "SIGUSR1",
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
// Error the token renewal
renewalCh, ok := ctx.vault.RenewTokens[ctx.tr.vaultFuture.Get()]
if !ok {
t.Fatalf("no renewal channel")
}
renewalCh <- fmt.Errorf("Test killing")
close(renewalCh)
// Ensure a restart
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l != 4 {
return false, fmt.Errorf("Expect four events; got %#v", ctx.upd.events)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskSignaling {
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskSignaling)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// Test that the payload is written to disk
func TestTaskRunner_SimpleRun_Dispatch(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
fileName := "test"
task.DispatchPayload = &structs.DispatchPayloadConfig{
File: fileName,
}
alloc.Job.ParameterizedJob = &structs.ParameterizedJobConfig{}
// Add an encrypted payload
expected := []byte("hello world")
compressed := snappy.Encode(nil, expected)
alloc.Job.Payload = compressed
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
defer ctx.allocDir.Destroy()
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
// Check that the file was written to disk properly
payloadPath := filepath.Join(ctx.tr.taskDir.LocalDir, fileName)
data, err := ioutil.ReadFile(payloadPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
if !reflect.DeepEqual(data, expected) {
t.Fatalf("Bad; got %v; want %v", string(data), string(expected))
}
}
// TestTaskRunner_CleanupEmpty ensures TaskRunner works when createdResources
// is empty.
func TestTaskRunner_CleanupEmpty(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
ctx.tr.Run()
// Since we only failed once, createdResources should be empty
if len(ctx.tr.createdResources.Resources) != 0 {
t.Fatalf("createdResources should still be empty: %v", ctx.tr.createdResources)
}
}
func TestTaskRunner_CleanupOK(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "1",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x", "y"}
ctx.tr.createdResources.Resources["foo"] = []string{"z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we only failed once, createdResources should be empty
if len(ctx.tr.createdResources.Resources) > 0 {
t.Fatalf("expected all created resources to be removed: %#v", ctx.tr.createdResources.Resources)
}
}
func TestTaskRunner_CleanupFail(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "5",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x"}
ctx.tr.createdResources.Resources["foo"] = []string{"y", "z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we failed > 3 times, the failed key should remain
expected := map[string][]string{key: {"x"}}
if !reflect.DeepEqual(expected, ctx.tr.createdResources.Resources) {
t.Fatalf("expected %#v but found: %#v", expected, ctx.tr.createdResources.Resources)
}
}
func TestTaskRunner_Pre06ScriptCheck(t *testing.T) {
t.Parallel()
run := func(ver, driver, checkType string, exp bool) (string, func(t *testing.T)) {
name := fmt.Sprintf("%s %s %s returns %t", ver, driver, checkType, exp)
return name, func(t *testing.T) {
services := []*structs.Service{
{
Checks: []*structs.ServiceCheck{
{
Type: checkType,
},
},
},
}
if act := pre06ScriptCheck(ver, driver, services); act != exp {
t.Errorf("expected %t received %t", exp, act)
}
}
}
t.Run(run("0.5.6", "exec", "script", true))
t.Run(run("0.5.6", "java", "script", true))
t.Run(run("0.5.6", "mock_driver", "script", true))
t.Run(run("0.5.9", "exec", "script", true))
t.Run(run("0.5.9", "java", "script", true))
t.Run(run("0.5.9", "mock_driver", "script", true))
t.Run(run("0.6.0dev", "exec", "script", false))
t.Run(run("0.6.0dev", "java", "script", false))
t.Run(run("0.6.0dev", "mock_driver", "script", false))
t.Run(run("0.6.0", "exec", "script", false))
t.Run(run("0.6.0", "java", "script", false))
t.Run(run("0.6.0", "mock_driver", "script", false))
t.Run(run("1.0.0", "exec", "script", false))
t.Run(run("1.0.0", "java", "script", false))
t.Run(run("1.0.0", "mock_driver", "script", false))
t.Run(run("0.5.6", "rkt", "script", false))
t.Run(run("0.5.6", "docker", "script", false))
t.Run(run("0.5.6", "qemu", "script", false))
t.Run(run("0.5.6", "raw_exec", "script", false))
t.Run(run("0.5.6", "invalid", "script", false))
t.Run(run("0.5.6", "exec", "tcp", false))
t.Run(run("0.5.6", "java", "tcp", false))
t.Run(run("0.5.6", "mock_driver", "tcp", false))
}
func TestTaskRunner_interpolateServices(t *testing.T) {
t.Parallel()
task := &structs.Task{
Services: []*structs.Service{
{
Name: "${name}",
PortLabel: "${portlabel}",
Tags: []string{"${tags}"},
Checks: []*structs.ServiceCheck{
{
Name: "${checkname}",
Type: "${checktype}",
Command: "${checkcmd}",
Args: []string{"${checkarg}"},
Path: "${checkstr}",
Protocol: "${checkproto}",
PortLabel: "${checklabel}",
InitialStatus: "${checkstatus}",
Method: "${checkmethod}",
Header: map[string][]string{
"${checkheaderk}": {"${checkheaderv}"},
},
},
},
},
},
}
env := &env.TaskEnv{
EnvMap: map[string]string{
"name": "name",
"portlabel": "portlabel",
"tags": "tags",
"checkname": "checkname",
"checktype": "checktype",
"checkcmd": "checkcmd",
"checkarg": "checkarg",
"checkstr": "checkstr",
"checkpath": "checkpath",
"checkproto": "checkproto",
"checklabel": "checklabel",
"checkstatus": "checkstatus",
"checkmethod": "checkmethod",
"checkheaderk": "checkheaderk",
"checkheaderv": "checkheaderv",
},
}
interpTask := interpolateServices(env, task)
exp := &structs.Task{
Services: []*structs.Service{
{
Name: "name",
PortLabel: "portlabel",
Tags: []string{"tags"},
Checks: []*structs.ServiceCheck{
{
Name: "checkname",
Type: "checktype",
Command: "checkcmd",
Args: []string{"checkarg"},
Path: "checkstr",
Protocol: "checkproto",
PortLabel: "checklabel",
InitialStatus: "checkstatus",
Method: "checkmethod",
Header: map[string][]string{
"checkheaderk": {"checkheaderv"},
},
},
},
},
},
}
if diff := pretty.Diff(interpTask, exp); len(diff) > 0 {
t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n"))
}
}
func TestTaskRunner_ShutdownDelay(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Tags = []string{"tag1"}
task.Services = task.Services[:1] // only need 1 for this test
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
}
// No shutdown escape hatch for this delay, so don't set it too high
task.ShutdownDelay = 500 * time.Duration(testutil.TestMultiplier()) * time.Millisecond
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n != 1 {
return false, fmt.Errorf("expected 1 service found %d", n)
}
for _, s := range services {
if !reflect.DeepEqual(s.Tags, task.Services[0].Tags) {
return false, fmt.Errorf("expected tags=%q but found %q",
strings.Join(task.Services[0].Tags, ","), strings.Join(s.Tags, ","))
}
}
return true, nil
}, func(err error) {
services, _ := ctx.consul.Services()
for _, s := range services {
t.Logf("Service: %#v", s)
}
t.Fatalf("err: %v", err)
})
// Begin the tear down
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
destroyed := time.Now()
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n == 1 {
return false, fmt.Errorf("expected 0 services found %d", n)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Wait for actual exit
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
// It should be impossible to reach here in less time than the shutdown delay
if time.Now().Before(destroyed.Add(task.ShutdownDelay)) {
t.Fatalf("task exited before shutdown delay")
}
}
// TestTaskRunner_CheckWatcher_Restart asserts that when enabled an unhealthy
// Consul check will cause a task to restart following restart policy rules.
func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
// Make the restart policy fail within this test
tg := alloc.Job.TaskGroups[0]
tg.RestartPolicy.Attempts = 2
tg.RestartPolicy.Interval = 1 * time.Minute
tg.RestartPolicy.Delay = 10 * time.Millisecond
tg.RestartPolicy.Mode = structs.RestartPolicyModeFail
task := tg.Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "100s",
}
// Make the task register a check that fails
task.Services[0].Checks[0] = &structs.ServiceCheck{
Name: "test-restarts",
Type: structs.ServiceCheckTCP,
Interval: 50 * time.Millisecond,
CheckRestart: &structs.CheckRestart{
Limit: 2,
Grace: 100 * time.Millisecond,
},
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
// Replace mock Consul ServiceClient, with the real ServiceClient
// backed by a mock consul whose checks are always unhealthy.
consulAgent := consul.NewMockAgent()
consulAgent.SetStatus("critical")
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger, true)
go consulClient.Run()
defer consulClient.Shutdown()
ctx.tr.consul = consulClient
ctx.consul = nil // prevent accidental use of old mock
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
expected := []string{
"Received",
"Task Setup",
"Started",
"Restart Signaled",
"Killing",
"Killed",
"Restarting",
"Started",
"Restart Signaled",
"Killing",
"Killed",
"Restarting",
"Started",
"Restart Signaled",
"Killing",
"Killed",
"Not Restarting",
}
if n := len(ctx.upd.events); n != len(expected) {
t.Fatalf("should have %d ctx.updates found %d: %s", len(expected), n, ctx.upd)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if !ctx.upd.failed {
t.Fatalf("expected failed")
}
for i, actual := range ctx.upd.events {
if actual.Type != expected[i] {
t.Errorf("%.2d - Expected %q but found %q", i, expected[i], actual.Type)
}
}
}
// TestTaskRunner_DriverNetwork asserts that a driver's network is properly
// used in services and checks.
func TestTaskRunner_DriverNetwork(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": 0,
"run_for": "100s",
"driver_ip": "10.1.2.3",
"driver_port_map": "http:80",
}
// Create services and checks with custom address modes to exercise
// address detection logic
task.Services = []*structs.Service{
{
Name: "host-service",
PortLabel: "http",
AddressMode: "host",
Checks: []*structs.ServiceCheck{
{
Name: "driver-check",
Type: "tcp",
PortLabel: "1234",
AddressMode: "driver",
},
},
},
{
Name: "driver-service",
PortLabel: "5678",
AddressMode: "driver",
Checks: []*structs.ServiceCheck{
{
Name: "host-check",
Type: "tcp",
PortLabel: "http",
},
{
Name: "driver-label-check",
Type: "tcp",
PortLabel: "http",
AddressMode: "driver",
},
},
},
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n != 2 {
return false, fmt.Errorf("expected 2 services, but found %d", n)
}
for _, s := range services {
switch s.Service {
case "host-service":
if expected := "192.168.0.100"; s.Address != expected {
return false, fmt.Errorf("expected host-service to have IP=%s but found %s",
expected, s.Address)
}
case "driver-service":
if expected := "10.1.2.3"; s.Address != expected {
return false, fmt.Errorf("expected driver-service to have IP=%s but found %s",
expected, s.Address)
}
if expected := 5678; s.Port != expected {
return false, fmt.Errorf("expected driver-service to have port=%d but found %d",
expected, s.Port)
}
default:
return false, fmt.Errorf("unexpected service: %q", s.Service)
}
}
checks := ctx.consul.CheckRegs()
if n := len(checks); n != 3 {
return false, fmt.Errorf("expected 3 checks, but found %d", n)
}
for _, check := range checks {
switch check.Name {
case "driver-check":
if expected := "10.1.2.3:1234"; check.TCP != expected {
return false, fmt.Errorf("expected driver-check to have address %q but found %q", expected, check.TCP)
}
case "driver-label-check":
if expected := "10.1.2.3:80"; check.TCP != expected {
return false, fmt.Errorf("expected driver-label-check to have address %q but found %q", expected, check.TCP)
}
case "host-check":
if expected := "192.168.0.100:"; !strings.HasPrefix(check.TCP, expected) {
return false, fmt.Errorf("expected host-check to have address start with %q but found %q", expected, check.TCP)
}
default:
return false, fmt.Errorf("unexpected check: %q", check.Name)
}
}
return true, nil
}, func(err error) {
services, _ := ctx.consul.Services()
for _, s := range services {
t.Logf(pretty.Sprint("Service: ", s))
}
for _, c := range ctx.consul.CheckRegs() {
t.Logf(pretty.Sprint("Check: ", c))
}
t.Fatalf("error: %v", err)
})
}