open-nomad/client/task_runner_test.go
Michael Schurter 0971114f0c Replace Consul TLSSkipVerify handling
Instead of checking Consul's version on startup to see if it supports
TLSSkipVerify, assume that it does and only log in the job service
handler if we discover Consul does not support TLSSkipVerify.

The old code would break TLSSkipVerify support if Nomad started before
Consul (such as on system boot) as TLSSkipVerify would default to false
if Consul wasn't running. Since TLSSkipVerify has been supported since
Consul 0.7.2, it's safe to relax our handling.
2018-03-14 17:43:06 -07:00

2034 lines
57 KiB
Go

package client
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strings"
"syscall"
"testing"
"time"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
)
func testLogger() *log.Logger {
return prefixedTestLogger("")
}
func prefixedTestLogger(prefix string) *log.Logger {
if testing.Verbose() {
return log.New(os.Stderr, prefix, log.LstdFlags|log.Lmicroseconds)
}
return log.New(ioutil.Discard, "", 0)
}
// Returns a tracker that never restarts.
func noRestartsTracker() *RestartTracker {
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
return newRestartTracker(policy, structs.JobTypeBatch)
}
type MockTaskStateUpdater struct {
state string
failed bool
events []*structs.TaskEvent
}
func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent, _ bool) {
if state != "" {
m.state = state
}
if event != nil {
if event.FailsTask {
m.failed = true
}
m.events = append(m.events, event)
}
}
// String for debugging purposes.
func (m *MockTaskStateUpdater) String() string {
s := fmt.Sprintf("Updates:\n state=%q\n failed=%t\n events=\n", m.state, m.failed)
for _, e := range m.events {
s += fmt.Sprintf(" %#v\n", e)
}
return s
}
type taskRunnerTestCtx struct {
upd *MockTaskStateUpdater
tr *TaskRunner
allocDir *allocdir.AllocDir
vault *vaultclient.MockVaultClient
consul *consul.MockAgent
consulClient *consul.ServiceClient
}
// Cleanup calls Destroy on the task runner and alloc dir
func (ctx *taskRunnerTestCtx) Cleanup() {
ctx.consulClient.Shutdown()
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
ctx.allocDir.Destroy()
}
func testTaskRunner(t *testing.T, restarts bool) *taskRunnerTestCtx {
// Use mock driver
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "500ms"
return testTaskRunnerFromAlloc(t, restarts, alloc)
}
// Creates a mock task runner using the first task in the first task group of
// the passed allocation.
//
// Callers should defer Cleanup() to cleanup after completion
func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocation) *taskRunnerTestCtx {
logger := testLogger()
conf := config.DefaultConfig()
conf.Node = mock.Node()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, err := ioutil.TempFile("", "state-db")
if err != nil {
t.Fatalf("error creating state db file: %v", err)
}
db, err := bolt.Open(tmp.Name(), 0600, nil)
if err != nil {
t.Fatalf("error creating state db: %v", err)
}
upd := &MockTaskStateUpdater{}
task := alloc.Job.TaskGroups[0].Tasks[0]
allocDir := allocdir.NewAllocDir(testLogger(), filepath.Join(conf.AllocDir, alloc.ID))
if err := allocDir.Build(); err != nil {
t.Fatalf("error building alloc dir: %v", err)
return nil
}
//HACK to get FSIsolation and chroot without using AllocRunner,
// TaskRunner, or Drivers
fsi := cstructs.FSIsolationImage
switch task.Driver {
case "raw_exec":
fsi = cstructs.FSIsolationNone
case "exec", "java":
fsi = cstructs.FSIsolationChroot
}
taskDir := allocDir.NewTaskDir(task.Name)
if err := taskDir.Build(false, config.DefaultChrootEnv, fsi); err != nil {
t.Fatalf("error building task dir %q: %v", task.Name, err)
return nil
}
vclient := vaultclient.NewMockVaultClient()
cclient := consul.NewMockAgent()
serviceClient := consul.NewServiceClient(cclient, logger)
go serviceClient.Run()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
if !restarts {
tr.restartTracker = noRestartsTracker()
}
return &taskRunnerTestCtx{
upd: upd,
tr: tr,
allocDir: allocDir,
vault: vclient,
consul: cclient,
consulClient: serviceClient,
}
}
// testWaitForTaskToStart waits for the task to or fails the test
func testWaitForTaskToStart(t *testing.T, ctx *taskRunnerTestCtx) {
// Wait for the task to start
testutil.WaitForResult(func() (bool, error) {
l := len(ctx.upd.events)
if l < 2 {
return false, fmt.Errorf("Expect two events; got %v", l)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if l >= 3 {
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
} else {
if ctx.upd.events[1].Type != structs.TaskStarted {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskStarted)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_SimpleRun(t *testing.T) {
t.Parallel()
ctx := testTaskRunner(t, false)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 3 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
event := ctx.upd.events[0]
if event.Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
event = ctx.upd.events[1]
if event.Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
displayMsg := event.DisplayMessage
if displayMsg != "Building Task Directory" {
t.Fatalf("Bad display message:%v", displayMsg)
}
event = ctx.upd.events[2]
if event.Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
displayMsg = event.DisplayMessage
if displayMsg != "Task started by client" {
t.Fatalf("Bad display message:%v", displayMsg)
}
event = ctx.upd.events[3]
if event.Type != structs.TaskTerminated {
t.Fatalf("Third Event was %v; want %v", event.Type, structs.TaskTerminated)
}
displayMsg = event.DisplayMessage
if displayMsg != "Exit Code: 0" {
t.Fatalf("Bad display message:%v", displayMsg)
}
if event.Details["exit_code"] != "0" {
t.Fatalf("Bad details map :%v", event.Details)
}
}
func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": 0,
"start_error": "driver failure",
"start_error_recoverable": true,
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l < 4 {
return false, fmt.Errorf("Expect at least four events; got %v", l)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDriverFailure {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDriverFailure)
}
if ctx.upd.events[3].Type != structs.TaskRestarting {
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestarting)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_Destroy(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
// Begin the tear down
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[3].Type != structs.TaskKilling {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskKilling)
}
if ctx.upd.events[4].Type != structs.TaskKilled {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilled)
}
}
func TestTaskRunner_Update(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Checks[0] = &structs.ServiceCheck{
Name: "http-check",
Type: "http",
PortLabel: "http",
Path: "${NOMAD_META_foo}",
}
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "100s",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
testWaitForTaskToStart(t, ctx)
// Update the task definition
updateAlloc := ctx.tr.alloc.Copy()
// Update the restart policy
newTG := updateAlloc.Job.TaskGroups[0]
newMode := "foo"
newTG.RestartPolicy.Mode = newMode
newTask := newTG.Tasks[0]
newTask.Driver = "mock_driver"
// Update meta to make sure service checks are interpolated correctly
// #2180
newTask.Meta["foo"] = "/UPDATE"
// Update the kill timeout
oldHandle := ctx.tr.handle.ID()
newTask.KillTimeout = time.Hour
ctx.tr.Update(updateAlloc)
// Wait for ctx.update to take place
testutil.WaitForResult(func() (bool, error) {
if ctx.tr.task == newTask {
return false, fmt.Errorf("We copied the pointer! This would be very bad")
}
if ctx.tr.task.Driver != newTask.Driver {
return false, fmt.Errorf("Task not copied")
}
if ctx.tr.restartTracker.policy.Mode != newMode {
return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.policy.Mode)
}
if ctx.tr.handle.ID() == oldHandle {
return false, fmt.Errorf("handle not ctx.updated")
}
// Make sure Consul services were interpolated correctly during
// the update #2180
checks := ctx.consul.CheckRegs()
if n := len(checks); n != 1 {
return false, fmt.Errorf("expected 1 check but found %d", n)
}
for _, check := range checks {
if found := check.HTTP; !strings.HasSuffix(found, "/UPDATE") {
return false, fmt.Errorf("expected consul check path to end with /UPDATE but found: %q", found)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_SaveRestoreState(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "5s",
}
// Give it a Vault token
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to be running and then snapshot the state
testWaitForTaskToStart(t, ctx)
if err := ctx.tr.SaveState(); err != nil {
t.Fatalf("err: %v", err)
}
// Read the token from the file system
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
data, err := ioutil.ReadFile(tokenPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
token := string(data)
if len(token) == 0 {
t.Fatalf("Token not written to disk")
}
// Create a new task runner
task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver, Vault: ctx.tr.task.Vault}
tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.tr.stateDB, ctx.upd.Update,
ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient, ctx.tr.consul)
tr2.restartTracker = noRestartsTracker()
if _, err := tr2.RestoreState(); err != nil {
t.Fatalf("err: %v", err)
}
go tr2.Run()
defer tr2.Destroy(structs.NewTaskEvent(structs.TaskKilled))
// Destroy and wait
select {
case <-tr2.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
// Check that we recovered the token
if act := tr2.vaultFuture.Get(); act != token {
t.Fatalf("Vault token not properly recovered")
}
}
func TestTaskRunner_Download_List(t *testing.T) {
t.Parallel()
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("."))))
defer ts.Close()
// Create an allocation that has a task with a list of artifacts.
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
f1 := "task_runner_test.go"
f2 := "task_runner.go"
artifact1 := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
}
artifact2 := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f2),
}
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[3].Type != structs.TaskStarted {
t.Fatalf("Forth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskStarted)
}
if ctx.upd.events[4].Type != structs.TaskTerminated {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskTerminated)
}
// Check that both files exist.
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f1)); err != nil {
t.Fatalf("%v not downloaded", f1)
}
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f2)); err != nil {
t.Fatalf("%v not downloaded", f2)
}
}
func TestTaskRunner_Download_Retries(t *testing.T) {
t.Parallel()
// Create an allocation that has a task with bad artifacts.
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
artifact := structs.TaskArtifact{
GetterSource: "http://127.0.0.1:0/foo/bar/baz",
}
task.Artifacts = []*structs.TaskArtifact{&artifact}
// Make the restart policy try one ctx.update
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
Attempts: 1,
Interval: 10 * time.Minute,
Delay: 1 * time.Second,
Mode: structs.RestartPolicyModeFail,
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 8 {
t.Fatalf("should have 8 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[3].Type != structs.TaskArtifactDownloadFailed {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskArtifactDownloadFailed)
}
if ctx.upd.events[4].Type != structs.TaskRestarting {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskRestarting)
}
if ctx.upd.events[5].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[6].Type != structs.TaskArtifactDownloadFailed {
t.Fatalf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskArtifactDownloadFailed)
}
if ctx.upd.events[7].Type != structs.TaskNotRestarting {
t.Fatalf("Eighth Event was %v; want %v", ctx.upd.events[7].Type, structs.TaskNotRestarting)
}
}
// TestTaskRunner_UnregisterConsul_Retries asserts a task is unregistered from
// Consul when waiting to be retried.
func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
t.Parallel()
// Create an allocation that has a task with bad artifacts.
alloc := mock.Alloc()
// Make the restart policy try one ctx.update
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
Attempts: 1,
Interval: 10 * time.Minute,
Delay: time.Nanosecond,
Mode: structs.RestartPolicyModeFail,
}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "1",
"run_for": "1ns",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
// Use mockConsulServiceClient
consul := newMockConsulServiceClient(t)
ctx.tr.consul = consul
ctx.tr.MarkReceived()
ctx.tr.Run()
defer ctx.Cleanup()
// Assert it is properly registered and unregistered
if expected := 4; len(consul.ops) != expected {
t.Errorf("expected %d consul ops but found: %d", expected, len(consul.ops))
}
if consul.ops[0].op != "add" {
t.Errorf("expected first op to be add but found: %q", consul.ops[0].op)
}
if consul.ops[1].op != "remove" {
t.Errorf("expected second op to be remove but found: %q", consul.ops[1].op)
}
if consul.ops[2].op != "add" {
t.Errorf("expected third op to be add but found: %q", consul.ops[2].op)
}
if consul.ops[3].op != "remove" {
t.Errorf("expected fourth/final op to be remove but found: %q", consul.ops[3].op)
}
}
func TestTaskRunner_Validate_UserEnforcement(t *testing.T) {
t.Parallel()
ctx := testTaskRunner(t, false)
defer ctx.Cleanup()
// Try to run as root with exec.
ctx.tr.task.Driver = "exec"
ctx.tr.task.User = "root"
if err := ctx.tr.validateTask(); err == nil {
t.Fatalf("expected error running as root with exec")
}
// Try to run a non-blacklisted user with exec.
ctx.tr.task.Driver = "exec"
ctx.tr.task.User = "foobar"
if err := ctx.tr.validateTask(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Try to run as root with docker.
ctx.tr.task.Driver = "docker"
ctx.tr.task.User = "root"
if err := ctx.tr.validateTask(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestTaskRunner_RestartTask(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "100s",
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for it to start
go func() {
testWaitForTaskToStart(t, ctx)
ctx.tr.Restart("test", "restart", false)
// Wait for it to restart then kill
go func() {
// Wait for the task to start again
testutil.WaitForResult(func() (bool, error) {
if len(ctx.upd.events) != 8 {
return false, fmt.Errorf("task %q in alloc %q should have 8 ctx.updates: %#v", task.Name, alloc.ID, ctx.upd.events)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
ctx.tr.Kill("test", "restart", false)
}()
}()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 10 {
t.Fatalf("should have 10 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskRestartSignal {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestartSignal)
}
if ctx.upd.events[4].Type != structs.TaskKilling {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilling)
}
if ctx.upd.events[5].Type != structs.TaskKilled {
t.Fatalf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskKilled)
}
if ctx.upd.events[6].Type != structs.TaskRestarting {
t.Fatalf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskRestarting)
}
if ctx.upd.events[7].Type != structs.TaskStarted {
t.Fatalf("Eighth Event was %v; want %v", ctx.upd.events[8].Type, structs.TaskStarted)
}
if ctx.upd.events[8].Type != structs.TaskKilling {
t.Fatalf("Ninth Event was %v; want %v", ctx.upd.events[8].Type, structs.TaskKilling)
}
if ctx.upd.events[9].Type != structs.TaskKilled {
t.Fatalf("Tenth Event was %v; want %v", ctx.upd.events[9].Type, structs.TaskKilled)
}
}
func TestTaskRunner_KillTask(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
go func() {
testWaitForTaskToStart(t, ctx)
ctx.tr.Kill("test", "kill", true)
}()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if !ctx.upd.failed {
t.Fatalf("TaskState should be failed: %+v", ctx.upd)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskKilling {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskKilling)
}
if ctx.upd.events[4].Type != structs.TaskKilled {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilled)
}
}
func TestTaskRunner_SignalFailure(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
"signal_error": "test forcing failure",
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
if err := ctx.tr.Signal("test", "test", syscall.SIGINT); err == nil {
t.Fatalf("Didn't receive error")
}
}
func TestTaskRunner_BlockForVault(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
// Control when we get a Vault token
token := "1234"
waitCh := make(chan struct{})
handler := func(*structs.Allocation, []string) (map[string]string, error) {
<-waitCh
return map[string]string{task.Name: token}, nil
}
ctx.tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
t.Fatalf("premature exit")
case <-time.After(1 * time.Second):
}
if len(ctx.upd.events) != 2 {
t.Fatalf("should have 2 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStatePending {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStatePending)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
// Unblock
close(waitCh)
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
// Check that the token is on disk
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
data, err := ioutil.ReadFile(tokenPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
if act := string(data); act != token {
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
}
// Check the token was revoked
m := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
testutil.WaitForResult(func() (bool, error) {
if len(m.StoppedTokens) != 1 {
return false, fmt.Errorf("Expected a stopped token: %v", m.StoppedTokens)
}
if a := m.StoppedTokens[0]; a != token {
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_DeriveToken_Retry(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
// Control when we get a Vault token
token := "1234"
count := 0
handler := func(*structs.Allocation, []string) (map[string]string, error) {
if count > 0 {
return map[string]string{task.Name: token}, nil
}
count++
return nil, structs.NewRecoverableError(fmt.Errorf("Want a retry"), true)
}
ctx.tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
// Check that the token is on disk
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
data, err := ioutil.ReadFile(tokenPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
if act := string(data); act != token {
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
}
// Check the token was revoked
m := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
testutil.WaitForResult(func() (bool, error) {
if len(m.StoppedTokens) != 1 {
return false, fmt.Errorf("Expected a stopped token: %v", m.StoppedTokens)
}
if a := m.StoppedTokens[0]; a != token {
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_DeriveToken_Unrecoverable(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
task.Vault = &structs.Vault{
Policies: []string{"default"},
ChangeMode: structs.VaultChangeModeRestart,
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
// Error the token derivation
vc := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
vc.SetDeriveTokenError(alloc.ID, []string{task.Name}, fmt.Errorf("Non recoverable"))
go ctx.tr.Run()
// Wait for the task to start
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l != 3 {
return false, fmt.Errorf("Expect 3 events; got %v", l)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskKilling {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskKilling)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_Template_Block(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Templates = []*structs.Template{
{
EmbeddedTmpl: "{{key \"foo\"}}",
DestPath: "local/test",
ChangeMode: structs.TemplateChangeModeNoop,
},
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
t.Fatalf("premature exit")
case <-time.After(1 * time.Second):
}
if len(ctx.upd.events) != 2 {
t.Fatalf("should have 2 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStatePending {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStatePending)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
// Unblock
ctx.tr.UnblockStart("test")
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
}
func TestTaskRunner_Template_Artifact(t *testing.T) {
t.Parallel()
dir, err := os.Getwd()
if err != nil {
t.Fatalf("bad: %v", err)
}
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Join(dir, ".."))))
defer ts.Close()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
// Create an allocation that has a task that renders a template from an
// artifact
f1 := "CHANGELOG.md"
artifact := structs.TaskArtifact{
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
}
task.Artifacts = []*structs.TaskArtifact{&artifact}
task.Templates = []*structs.Template{
{
SourcePath: "CHANGELOG.md",
DestPath: "local/test",
ChangeMode: structs.TemplateChangeModeNoop,
},
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 5 {
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
}
if ctx.upd.events[3].Type != structs.TaskStarted {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskStarted)
}
if ctx.upd.events[4].Type != structs.TaskTerminated {
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskTerminated)
}
// Check that both files exist.
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f1)); err != nil {
t.Fatalf("%v not downloaded", f1)
}
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.LocalDir, "test")); err != nil {
t.Fatalf("template not rendered")
}
}
func TestTaskRunner_Template_NewVaultToken(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
task.Templates = []*structs.Template{
{
EmbeddedTmpl: "{{key \"foo\"}}",
DestPath: "local/test",
ChangeMode: structs.TemplateChangeModeNoop,
},
}
task.Vault = &structs.Vault{Policies: []string{"default"}}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
go ctx.tr.Run()
// Wait for a Vault token
var token string
testutil.WaitForResult(func() (bool, error) {
if token = ctx.tr.vaultFuture.Get(); token == "" {
return false, fmt.Errorf("No Vault token")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Error the token renewal
renewalCh, ok := ctx.vault.RenewTokens[token]
if !ok {
t.Fatalf("no renewal channel")
}
originalManager := ctx.tr.templateManager
renewalCh <- fmt.Errorf("Test killing")
close(renewalCh)
// Wait for a new Vault token
var token2 string
testutil.WaitForResult(func() (bool, error) {
if token2 = ctx.tr.vaultFuture.Get(); token2 == "" || token2 == token {
return false, fmt.Errorf("No new Vault token")
}
if originalManager == ctx.tr.templateManager {
return false, fmt.Errorf("Template manager not ctx.updated")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check the token was revoked
testutil.WaitForResult(func() (bool, error) {
if len(ctx.vault.StoppedTokens) != 1 {
return false, fmt.Errorf("Expected a stopped token: %v", ctx.vault.StoppedTokens)
}
if a := ctx.vault.StoppedTokens[0]; a != token {
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_VaultManager_Restart(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
task.Vault = &structs.Vault{
Policies: []string{"default"},
ChangeMode: structs.VaultChangeModeRestart,
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
go ctx.tr.Run()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
// Error the token renewal
renewalCh, ok := ctx.vault.RenewTokens[ctx.tr.vaultFuture.Get()]
if !ok {
t.Fatalf("no renewal channel")
}
renewalCh <- fmt.Errorf("Test killing")
close(renewalCh)
// Ensure a restart
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l != 8 {
return false, fmt.Errorf("Expect eight events; got %#v", ctx.upd.events)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskStarted)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskRestartSignal {
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestartSignal)
}
if ctx.upd.events[4].Type != structs.TaskKilling {
return false, fmt.Errorf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilling)
}
if ctx.upd.events[5].Type != structs.TaskKilled {
return false, fmt.Errorf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskKilled)
}
if ctx.upd.events[6].Type != structs.TaskRestarting {
return false, fmt.Errorf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskRestarting)
}
if ctx.upd.events[7].Type != structs.TaskStarted {
return false, fmt.Errorf("Eight Event was %v; want %v", ctx.upd.events[7].Type, structs.TaskStarted)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestTaskRunner_VaultManager_Signal(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "10s",
}
task.Vault = &structs.Vault{
Policies: []string{"default"},
ChangeMode: structs.VaultChangeModeSignal,
ChangeSignal: "SIGUSR1",
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
// Error the token renewal
renewalCh, ok := ctx.vault.RenewTokens[ctx.tr.vaultFuture.Get()]
if !ok {
t.Fatalf("no renewal channel")
}
renewalCh <- fmt.Errorf("Test killing")
close(renewalCh)
// Ensure a restart
testutil.WaitForResult(func() (bool, error) {
if l := len(ctx.upd.events); l != 4 {
return false, fmt.Errorf("Expect four events; got %#v", ctx.upd.events)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskSignaling {
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskSignaling)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// Test that the payload is written to disk
func TestTaskRunner_SimpleRun_Dispatch(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "1s",
}
fileName := "test"
task.DispatchPayload = &structs.DispatchPayloadConfig{
File: fileName,
}
alloc.Job.ParameterizedJob = &structs.ParameterizedJobConfig{}
// Add an encrypted payload
expected := []byte("hello world")
compressed := snappy.Encode(nil, expected)
alloc.Job.Payload = compressed
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
defer ctx.allocDir.Destroy()
go ctx.tr.Run()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
if len(ctx.upd.events) != 4 {
t.Fatalf("should have 4 updates: %#v", ctx.upd.events)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if ctx.upd.events[0].Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}
if ctx.upd.events[1].Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
if ctx.upd.events[2].Type != structs.TaskStarted {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
}
// Check that the file was written to disk properly
payloadPath := filepath.Join(ctx.tr.taskDir.LocalDir, fileName)
data, err := ioutil.ReadFile(payloadPath)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
if !reflect.DeepEqual(data, expected) {
t.Fatalf("Bad; got %v; want %v", string(data), string(expected))
}
}
// TestTaskRunner_CleanupEmpty ensures TaskRunner works when createdResources
// is empty.
func TestTaskRunner_CleanupEmpty(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
defer ctx.Cleanup()
ctx.tr.Run()
// Since we only failed once, createdResources should be empty
if len(ctx.tr.createdResources.Resources) != 0 {
t.Fatalf("createdResources should still be empty: %v", ctx.tr.createdResources)
}
}
func TestTaskRunner_CleanupOK(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "1",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x", "y"}
ctx.tr.createdResources.Resources["foo"] = []string{"z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we only failed once, createdResources should be empty
if len(ctx.tr.createdResources.Resources) > 0 {
t.Fatalf("expected all created resources to be removed: %#v", ctx.tr.createdResources.Resources)
}
}
func TestTaskRunner_CleanupFail(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
key := "ERR"
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.config.Options = map[string]string{
"cleanup_fail_on": key,
"cleanup_fail_num": "5",
}
ctx.tr.MarkReceived()
ctx.tr.createdResources.Resources[key] = []string{"x"}
ctx.tr.createdResources.Resources["foo"] = []string{"y", "z"}
defer ctx.Cleanup()
ctx.tr.Run()
// Since we failed > 3 times, the failed key should remain
expected := map[string][]string{key: {"x"}}
if !reflect.DeepEqual(expected, ctx.tr.createdResources.Resources) {
t.Fatalf("expected %#v but found: %#v", expected, ctx.tr.createdResources.Resources)
}
}
func TestTaskRunner_Pre06ScriptCheck(t *testing.T) {
t.Parallel()
run := func(ver, driver, checkType string, exp bool) (string, func(t *testing.T)) {
name := fmt.Sprintf("%s %s %s returns %t", ver, driver, checkType, exp)
return name, func(t *testing.T) {
services := []*structs.Service{
{
Checks: []*structs.ServiceCheck{
{
Type: checkType,
},
},
},
}
if act := pre06ScriptCheck(ver, driver, services); act != exp {
t.Errorf("expected %t received %t", exp, act)
}
}
}
t.Run(run("0.5.6", "exec", "script", true))
t.Run(run("0.5.6", "java", "script", true))
t.Run(run("0.5.6", "mock_driver", "script", true))
t.Run(run("0.5.9", "exec", "script", true))
t.Run(run("0.5.9", "java", "script", true))
t.Run(run("0.5.9", "mock_driver", "script", true))
t.Run(run("0.6.0dev", "exec", "script", false))
t.Run(run("0.6.0dev", "java", "script", false))
t.Run(run("0.6.0dev", "mock_driver", "script", false))
t.Run(run("0.6.0", "exec", "script", false))
t.Run(run("0.6.0", "java", "script", false))
t.Run(run("0.6.0", "mock_driver", "script", false))
t.Run(run("1.0.0", "exec", "script", false))
t.Run(run("1.0.0", "java", "script", false))
t.Run(run("1.0.0", "mock_driver", "script", false))
t.Run(run("0.5.6", "rkt", "script", false))
t.Run(run("0.5.6", "docker", "script", false))
t.Run(run("0.5.6", "qemu", "script", false))
t.Run(run("0.5.6", "raw_exec", "script", false))
t.Run(run("0.5.6", "invalid", "script", false))
t.Run(run("0.5.6", "exec", "tcp", false))
t.Run(run("0.5.6", "java", "tcp", false))
t.Run(run("0.5.6", "mock_driver", "tcp", false))
}
func TestTaskRunner_interpolateServices(t *testing.T) {
t.Parallel()
task := &structs.Task{
Services: []*structs.Service{
{
Name: "${name}",
PortLabel: "${portlabel}",
Tags: []string{"${tags}"},
Checks: []*structs.ServiceCheck{
{
Name: "${checkname}",
Type: "${checktype}",
Command: "${checkcmd}",
Args: []string{"${checkarg}"},
Path: "${checkstr}",
Protocol: "${checkproto}",
PortLabel: "${checklabel}",
InitialStatus: "${checkstatus}",
Method: "${checkmethod}",
Header: map[string][]string{
"${checkheaderk}": {"${checkheaderv}"},
},
},
},
},
},
}
env := &env.TaskEnv{
EnvMap: map[string]string{
"name": "name",
"portlabel": "portlabel",
"tags": "tags",
"checkname": "checkname",
"checktype": "checktype",
"checkcmd": "checkcmd",
"checkarg": "checkarg",
"checkstr": "checkstr",
"checkpath": "checkpath",
"checkproto": "checkproto",
"checklabel": "checklabel",
"checkstatus": "checkstatus",
"checkmethod": "checkmethod",
"checkheaderk": "checkheaderk",
"checkheaderv": "checkheaderv",
},
}
interpTask := interpolateServices(env, task)
exp := &structs.Task{
Services: []*structs.Service{
{
Name: "name",
PortLabel: "portlabel",
Tags: []string{"tags"},
Checks: []*structs.ServiceCheck{
{
Name: "checkname",
Type: "checktype",
Command: "checkcmd",
Args: []string{"checkarg"},
Path: "checkstr",
Protocol: "checkproto",
PortLabel: "checklabel",
InitialStatus: "checkstatus",
Method: "checkmethod",
Header: map[string][]string{
"checkheaderk": {"checkheaderv"},
},
},
},
},
},
}
if diff := pretty.Diff(interpTask, exp); len(diff) > 0 {
t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n"))
}
}
func TestTaskRunner_ShutdownDelay(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Tags = []string{"tag1"}
task.Services = task.Services[:1] // only need 1 for this test
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
}
// No shutdown escape hatch for this delay, so don't set it too high
task.ShutdownDelay = 500 * time.Duration(testutil.TestMultiplier()) * time.Millisecond
ctx := testTaskRunnerFromAlloc(t, true, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n != 1 {
return false, fmt.Errorf("expected 1 service found %d", n)
}
for _, s := range services {
if !reflect.DeepEqual(s.Tags, task.Services[0].Tags) {
return false, fmt.Errorf("expected tags=%q but found %q",
strings.Join(task.Services[0].Tags, ","), strings.Join(s.Tags, ","))
}
}
return true, nil
}, func(err error) {
services, _ := ctx.consul.Services()
for _, s := range services {
t.Logf("Service: %#v", s)
}
t.Fatalf("err: %v", err)
})
// Begin the tear down
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
destroyed := time.Now()
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n == 1 {
return false, fmt.Errorf("expected 0 services found %d", n)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Wait for actual exit
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
// It should be impossible to reach here in less time than the shutdown delay
if time.Now().Before(destroyed.Add(task.ShutdownDelay)) {
t.Fatalf("task exited before shutdown delay")
}
}
// TestTaskRunner_CheckWatcher_Restart asserts that when enabled an unhealthy
// Consul check will cause a task to restart following restart policy rules.
func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
// Make the restart policy fail within this test
tg := alloc.Job.TaskGroups[0]
tg.RestartPolicy.Attempts = 2
tg.RestartPolicy.Interval = 1 * time.Minute
tg.RestartPolicy.Delay = 10 * time.Millisecond
tg.RestartPolicy.Mode = structs.RestartPolicyModeFail
task := tg.Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": "0",
"run_for": "100s",
}
// Make the task register a check that fails
task.Services[0].Checks[0] = &structs.ServiceCheck{
Name: "test-restarts",
Type: structs.ServiceCheckTCP,
Interval: 50 * time.Millisecond,
CheckRestart: &structs.CheckRestart{
Limit: 2,
Grace: 100 * time.Millisecond,
},
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
// Replace mock Consul ServiceClient, with the real ServiceClient
// backed by a mock consul whose checks are always unhealthy.
consulAgent := consul.NewMockAgent()
consulAgent.SetStatus("critical")
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger)
go consulClient.Run()
defer consulClient.Shutdown()
ctx.tr.consul = consulClient
ctx.consul = nil // prevent accidental use of old mock
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
select {
case <-ctx.tr.WaitCh():
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
t.Fatalf("timeout")
}
expected := []string{
"Received",
"Task Setup",
"Started",
"Restart Signaled",
"Killing",
"Killed",
"Restarting",
"Started",
"Restart Signaled",
"Killing",
"Killed",
"Restarting",
"Started",
"Restart Signaled",
"Killing",
"Killed",
"Not Restarting",
}
if n := len(ctx.upd.events); n != len(expected) {
t.Fatalf("should have %d ctx.updates found %d: %s", len(expected), n, ctx.upd)
}
if ctx.upd.state != structs.TaskStateDead {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}
if !ctx.upd.failed {
t.Fatalf("expected failed")
}
for i, actual := range ctx.upd.events {
if actual.Type != expected[i] {
t.Errorf("%.2d - Expected %q but found %q", i, expected[i], actual.Type)
}
}
}
// TestTaskRunner_DriverNetwork asserts that a driver's network is properly
// used in services and checks.
func TestTaskRunner_DriverNetwork(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": 0,
"run_for": "100s",
"driver_ip": "10.1.2.3",
"driver_port_map": "http:80",
}
// Create services and checks with custom address modes to exercise
// address detection logic
task.Services = []*structs.Service{
{
Name: "host-service",
PortLabel: "http",
AddressMode: "host",
Checks: []*structs.ServiceCheck{
{
Name: "driver-check",
Type: "tcp",
PortLabel: "1234",
AddressMode: "driver",
},
},
},
{
Name: "driver-service",
PortLabel: "5678",
AddressMode: "driver",
Checks: []*structs.ServiceCheck{
{
Name: "host-check",
Type: "tcp",
PortLabel: "http",
},
{
Name: "driver-label-check",
Type: "tcp",
PortLabel: "http",
AddressMode: "driver",
},
},
},
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n != 2 {
return false, fmt.Errorf("expected 2 services, but found %d", n)
}
for _, s := range services {
switch s.Service {
case "host-service":
if expected := "192.168.0.100"; s.Address != expected {
return false, fmt.Errorf("expected host-service to have IP=%s but found %s",
expected, s.Address)
}
case "driver-service":
if expected := "10.1.2.3"; s.Address != expected {
return false, fmt.Errorf("expected driver-service to have IP=%s but found %s",
expected, s.Address)
}
if expected := 5678; s.Port != expected {
return false, fmt.Errorf("expected driver-service to have port=%d but found %d",
expected, s.Port)
}
default:
return false, fmt.Errorf("unexpected service: %q", s.Service)
}
}
checks := ctx.consul.CheckRegs()
if n := len(checks); n != 3 {
return false, fmt.Errorf("expected 3 checks, but found %d", n)
}
for _, check := range checks {
switch check.Name {
case "driver-check":
if expected := "10.1.2.3:1234"; check.TCP != expected {
return false, fmt.Errorf("expected driver-check to have address %q but found %q", expected, check.TCP)
}
case "driver-label-check":
if expected := "10.1.2.3:80"; check.TCP != expected {
return false, fmt.Errorf("expected driver-label-check to have address %q but found %q", expected, check.TCP)
}
case "host-check":
if expected := "192.168.0.100:"; !strings.HasPrefix(check.TCP, expected) {
return false, fmt.Errorf("expected host-check to have address start with %q but found %q", expected, check.TCP)
}
default:
return false, fmt.Errorf("unexpected check: %q", check.Name)
}
}
return true, nil
}, func(err error) {
services, _ := ctx.consul.Services()
for _, s := range services {
t.Logf(pretty.Sprint("Service: ", s))
}
for _, c := range ctx.consul.CheckRegs() {
t.Logf(pretty.Sprint("Check: ", c))
}
t.Fatalf("error: %v", err)
})
}