1a29811169
As part of deprecating legacy drivers, we're moving the env package to a new drivers/shared tree, as it is used by the modern docker and rkt driver packages, and is useful for 3rd party plugins.
2036 lines
57 KiB
Go
2036 lines
57 KiB
Go
// +build deprecated
|
|
|
|
package taskrunner
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"strings"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/boltdb/bolt"
|
|
"github.com/golang/snappy"
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
|
|
"github.com/hashicorp/nomad/client/config"
|
|
consulApi "github.com/hashicorp/nomad/client/consul"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/client/vaultclient"
|
|
"github.com/hashicorp/nomad/command/agent/consul"
|
|
"github.com/hashicorp/nomad/drivers/shared/env"
|
|
"github.com/hashicorp/nomad/helper/testlog"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/kr/pretty"
|
|
)
|
|
|
|
// Returns a tracker that never restarts.
|
|
func noRestartsTracker() *restarts.RestartTracker {
|
|
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
|
|
return restarts.NewRestartTracker(policy, structs.JobTypeBatch)
|
|
}
|
|
|
|
type MockTaskStateUpdater struct {
|
|
state string
|
|
failed bool
|
|
events []*structs.TaskEvent
|
|
}
|
|
|
|
func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent, _ bool) {
|
|
if state != "" {
|
|
m.state = state
|
|
}
|
|
if event != nil {
|
|
if event.FailsTask {
|
|
m.failed = true
|
|
}
|
|
m.events = append(m.events, event)
|
|
}
|
|
}
|
|
|
|
// String for debugging purposes.
|
|
func (m *MockTaskStateUpdater) String() string {
|
|
s := fmt.Sprintf("Updates:\n state=%q\n failed=%t\n events=\n", m.state, m.failed)
|
|
for _, e := range m.events {
|
|
s += fmt.Sprintf(" %#v\n", e)
|
|
}
|
|
return s
|
|
}
|
|
|
|
type taskRunnerTestCtx struct {
|
|
upd *MockTaskStateUpdater
|
|
tr *TaskRunner
|
|
allocDir *allocdir.AllocDir
|
|
vault *vaultclient.MockVaultClient
|
|
consul *consul.MockAgent
|
|
consulClient *consul.ServiceClient
|
|
}
|
|
|
|
// Cleanup calls Destroy on the task runner and alloc dir
|
|
func (ctx *taskRunnerTestCtx) Cleanup() {
|
|
ctx.consulClient.Shutdown()
|
|
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
ctx.allocDir.Destroy()
|
|
}
|
|
|
|
func testTaskRunner(t *testing.T, restarts bool) *taskRunnerTestCtx {
|
|
// Use mock driver
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"run_for": "500ms",
|
|
}
|
|
return testTaskRunnerFromAlloc(t, restarts, alloc)
|
|
}
|
|
|
|
// Creates a mock task runner using the first task in the first task group of
|
|
// the passed allocation.
|
|
//
|
|
// Callers should defer Cleanup() to cleanup after completion
|
|
func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocation) *taskRunnerTestCtx {
|
|
logger := testlog.Logger(t)
|
|
conf := config.DefaultConfig()
|
|
conf.Node = mock.Node()
|
|
conf.StateDir = os.TempDir()
|
|
conf.AllocDir = os.TempDir()
|
|
|
|
tmp, err := ioutil.TempFile("", "state-db")
|
|
if err != nil {
|
|
t.Fatalf("error creating state db file: %v", err)
|
|
}
|
|
db, err := bolt.Open(tmp.Name(), 0600, nil)
|
|
if err != nil {
|
|
t.Fatalf("error creating state db: %v", err)
|
|
}
|
|
|
|
upd := &MockTaskStateUpdater{}
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
|
|
allocDir := allocdir.NewAllocDir(testlog.Logger(t), filepath.Join(conf.AllocDir, alloc.ID))
|
|
if err := allocDir.Build(); err != nil {
|
|
t.Fatalf("error building alloc dir: %v", err)
|
|
return nil
|
|
}
|
|
|
|
//HACK to get FSIsolation and chroot without using AllocRunner,
|
|
// TaskRunner, or Drivers
|
|
fsi := cstructs.FSIsolationImage
|
|
switch task.Driver {
|
|
case "raw_exec":
|
|
fsi = cstructs.FSIsolationNone
|
|
case "exec", "java":
|
|
fsi = cstructs.FSIsolationChroot
|
|
}
|
|
taskDir := allocDir.NewTaskDir(task.Name)
|
|
if err := taskDir.Build(false, config.DefaultChrootEnv, fsi); err != nil {
|
|
t.Fatalf("error building task dir %q: %v", task.Name, err)
|
|
return nil
|
|
}
|
|
|
|
vclient := vaultclient.NewMockVaultClient()
|
|
cclient := consul.NewMockAgent()
|
|
serviceClient := consul.NewServiceClient(cclient, testlog.HCLogger(t), true)
|
|
go serviceClient.Run()
|
|
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
|
|
if !restarts {
|
|
tr.restartTracker = noRestartsTracker()
|
|
}
|
|
return &taskRunnerTestCtx{
|
|
upd: upd,
|
|
tr: tr,
|
|
allocDir: allocDir,
|
|
vault: vclient,
|
|
consul: cclient,
|
|
consulClient: serviceClient,
|
|
}
|
|
}
|
|
|
|
// testWaitForTaskToStart waits for the task to or fails the test
|
|
func testWaitForTaskToStart(t *testing.T, ctx *taskRunnerTestCtx) {
|
|
// Wait for the task to start
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
l := len(ctx.upd.events)
|
|
if l < 2 {
|
|
return false, fmt.Errorf("Expect two events; got %v", l)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if l >= 3 {
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
} else {
|
|
if ctx.upd.events[1].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_SimpleRun(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testTaskRunner(t, false)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 4 {
|
|
t.Fatalf("should have 3 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
event := ctx.upd.events[0]
|
|
|
|
if event.Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
event = ctx.upd.events[1]
|
|
if event.Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
displayMsg := event.DisplayMessage
|
|
|
|
if displayMsg != "Building Task Directory" {
|
|
t.Fatalf("Bad display message:%v", displayMsg)
|
|
}
|
|
|
|
event = ctx.upd.events[2]
|
|
if event.Type != structs.TaskStarted {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
displayMsg = event.DisplayMessage
|
|
if displayMsg != "Task started by client" {
|
|
t.Fatalf("Bad display message:%v", displayMsg)
|
|
}
|
|
|
|
event = ctx.upd.events[3]
|
|
if event.Type != structs.TaskTerminated {
|
|
t.Fatalf("Third Event was %v; want %v", event.Type, structs.TaskTerminated)
|
|
}
|
|
displayMsg = event.DisplayMessage
|
|
if displayMsg != "Exit Code: 0" {
|
|
t.Fatalf("Bad display message:%v", displayMsg)
|
|
}
|
|
if event.Details["exit_code"] != "0" {
|
|
t.Fatalf("Bad details map :%v", event.Details)
|
|
}
|
|
|
|
}
|
|
|
|
func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": 0,
|
|
"start_error": "driver failure",
|
|
"start_error_recoverable": true,
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(ctx.upd.events); l < 4 {
|
|
return false, fmt.Errorf("Expect at least four events; got %v", l)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskDriverFailure {
|
|
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDriverFailure)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskRestarting {
|
|
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_Destroy(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"run_for": "1000s",
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Wait for the task to start
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
// Begin the tear down
|
|
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 5 {
|
|
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskKilling {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if ctx.upd.events[4].Type != structs.TaskKilled {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilled)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Update(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Services[0].Checks[0] = &structs.ServiceCheck{
|
|
Name: "http-check",
|
|
Type: "http",
|
|
PortLabel: "http",
|
|
Path: "${NOMAD_META_foo}",
|
|
}
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"run_for": "100s",
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
// Update the task definition
|
|
updateAlloc := ctx.tr.alloc.Copy()
|
|
|
|
// Update the restart policy
|
|
newTG := updateAlloc.Job.TaskGroups[0]
|
|
newMode := "foo"
|
|
newTG.RestartPolicy.Mode = newMode
|
|
|
|
newTask := newTG.Tasks[0]
|
|
newTask.Driver = "mock_driver"
|
|
|
|
// Update meta to make sure service checks are interpolated correctly
|
|
// #2180
|
|
newTask.Meta["foo"] = "/UPDATE"
|
|
|
|
// Update the kill timeout
|
|
oldHandle := ctx.tr.handle.ID()
|
|
newTask.KillTimeout = time.Hour
|
|
ctx.tr.Update(updateAlloc)
|
|
|
|
// Wait for ctx.update to take place
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if ctx.tr.task == newTask {
|
|
return false, fmt.Errorf("We copied the pointer! This would be very bad")
|
|
}
|
|
if ctx.tr.task.Driver != newTask.Driver {
|
|
return false, fmt.Errorf("Task not copied")
|
|
}
|
|
if ctx.tr.restartTracker.GetPolicy().Mode != newMode {
|
|
return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.GetPolicy().Mode)
|
|
}
|
|
if ctx.tr.handle.ID() == oldHandle {
|
|
return false, fmt.Errorf("handle not ctx.updated")
|
|
}
|
|
|
|
// Make sure Consul services were interpolated correctly during
|
|
// the update #2180
|
|
checks := ctx.consul.CheckRegs()
|
|
if n := len(checks); n != 1 {
|
|
return false, fmt.Errorf("expected 1 check but found %d", n)
|
|
}
|
|
for _, check := range checks {
|
|
if found := check.HTTP; !strings.HasSuffix(found, "/UPDATE") {
|
|
return false, fmt.Errorf("expected consul check path to end with /UPDATE but found: %q", found)
|
|
}
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_SaveRestoreState(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "5s",
|
|
}
|
|
|
|
// Give it a Vault token
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Wait for the task to be running and then snapshot the state
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
if err := ctx.tr.SaveState(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Read the token from the file system
|
|
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
|
|
data, err := ioutil.ReadFile(tokenPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file: %v", err)
|
|
}
|
|
token := string(data)
|
|
if len(token) == 0 {
|
|
t.Fatalf("Token not written to disk")
|
|
}
|
|
|
|
// Create a new task runner
|
|
task2 := &structs.Task{Name: ctx.tr.task.Name, Driver: ctx.tr.task.Driver, Vault: ctx.tr.task.Vault}
|
|
tr2 := NewTaskRunner(ctx.tr.logger, ctx.tr.config, ctx.tr.stateDB, ctx.upd.Update,
|
|
ctx.tr.taskDir, ctx.tr.alloc, task2, ctx.tr.vaultClient, ctx.tr.consul)
|
|
tr2.restartTracker = noRestartsTracker()
|
|
if _, err := tr2.RestoreState(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
go tr2.Run()
|
|
defer tr2.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
|
|
// Destroy and wait
|
|
select {
|
|
case <-tr2.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
// Check that we recovered the token
|
|
if act := tr2.vaultFuture.Get(); act != token {
|
|
t.Fatalf("Vault token not properly recovered")
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Download_List(t *testing.T) {
|
|
t.Parallel()
|
|
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("."))))
|
|
defer ts.Close()
|
|
|
|
// Create an allocation that has a task with a list of artifacts.
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
f1 := "task_runner_test.go"
|
|
f2 := "task_runner.go"
|
|
artifact1 := structs.TaskArtifact{
|
|
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
|
|
}
|
|
artifact2 := structs.TaskArtifact{
|
|
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f2),
|
|
}
|
|
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 5 {
|
|
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskStarted {
|
|
t.Fatalf("Forth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[4].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that both files exist.
|
|
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f1)); err != nil {
|
|
t.Fatalf("%v not downloaded", f1)
|
|
}
|
|
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f2)); err != nil {
|
|
t.Fatalf("%v not downloaded", f2)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Download_Retries(t *testing.T) {
|
|
t.Parallel()
|
|
// Create an allocation that has a task with bad artifacts.
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
artifact := structs.TaskArtifact{
|
|
GetterSource: "http://127.0.0.1:0/foo/bar/baz",
|
|
}
|
|
task.Artifacts = []*structs.TaskArtifact{&artifact}
|
|
|
|
// Make the restart policy try one ctx.update
|
|
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
|
Attempts: 1,
|
|
Interval: 10 * time.Minute,
|
|
Delay: 1 * time.Second,
|
|
Mode: structs.RestartPolicyModeFail,
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 8 {
|
|
t.Fatalf("should have 8 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskArtifactDownloadFailed {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskArtifactDownloadFailed)
|
|
}
|
|
|
|
if ctx.upd.events[4].Type != structs.TaskRestarting {
|
|
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
if ctx.upd.events[5].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if ctx.upd.events[6].Type != structs.TaskArtifactDownloadFailed {
|
|
t.Fatalf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskArtifactDownloadFailed)
|
|
}
|
|
|
|
if ctx.upd.events[7].Type != structs.TaskNotRestarting {
|
|
t.Fatalf("Eighth Event was %v; want %v", ctx.upd.events[7].Type, structs.TaskNotRestarting)
|
|
}
|
|
}
|
|
|
|
// TestTaskRunner_UnregisterConsul_Retries asserts a task is unregistered from
|
|
// Consul when waiting to be retried.
|
|
func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
|
|
t.Parallel()
|
|
// Create an allocation that has a task with bad artifacts.
|
|
alloc := mock.Alloc()
|
|
|
|
// Make the restart policy try one ctx.update
|
|
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
|
Attempts: 1,
|
|
Interval: 10 * time.Minute,
|
|
Delay: time.Nanosecond,
|
|
Mode: structs.RestartPolicyModeFail,
|
|
}
|
|
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "1",
|
|
"run_for": "1ns",
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
|
|
// Use mockConsulServiceClient
|
|
consul := consulApi.NewMockConsulServiceClient(t, testlog.HCLogger(t))
|
|
ctx.tr.consul = consul
|
|
|
|
ctx.tr.MarkReceived()
|
|
ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Assert it is properly registered and unregistered
|
|
if expected := 6; len(consul.Ops) != expected {
|
|
t.Errorf("expected %d consul ops but found: %d", expected, len(consul.Ops))
|
|
}
|
|
if consul.Ops[0].Op != "add" {
|
|
t.Errorf("expected first Op to be add but found: %q", consul.Ops[0].Op)
|
|
}
|
|
if consul.Ops[1].Op != "remove" {
|
|
t.Errorf("expected second op to be remove but found: %q", consul.Ops[1].Op)
|
|
}
|
|
if consul.Ops[2].Op != "remove" {
|
|
t.Errorf("expected third op to be remove but found: %q", consul.Ops[2].Op)
|
|
}
|
|
if consul.Ops[3].Op != "add" {
|
|
t.Errorf("expected fourth op to be add but found: %q", consul.Ops[3].Op)
|
|
}
|
|
if consul.Ops[4].Op != "remove" {
|
|
t.Errorf("expected fifth op to be remove but found: %q", consul.Ops[4].Op)
|
|
}
|
|
if consul.Ops[5].Op != "remove" {
|
|
t.Errorf("expected sixth op to be remove but found: %q", consul.Ops[5].Op)
|
|
}
|
|
}
|
|
|
|
//XXX Ported to allocrunner/task_runner/validate_hook_test.go
|
|
func TestTaskRunner_Validate_UserEnforcement(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := testTaskRunner(t, false)
|
|
defer ctx.Cleanup()
|
|
|
|
// Try to run as root with exec.
|
|
ctx.tr.task.Driver = "exec"
|
|
ctx.tr.task.User = "root"
|
|
if err := ctx.tr.validateTask(); err == nil {
|
|
t.Fatalf("expected error running as root with exec")
|
|
}
|
|
|
|
// Try to run a non-blacklisted user with exec.
|
|
ctx.tr.task.Driver = "exec"
|
|
ctx.tr.task.User = "foobar"
|
|
if err := ctx.tr.validateTask(); err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
// Try to run as root with docker.
|
|
ctx.tr.task.Driver = "docker"
|
|
ctx.tr.task.User = "root"
|
|
if err := ctx.tr.validateTask(); err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_RestartTask(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "100s",
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Wait for it to start
|
|
go func() {
|
|
testWaitForTaskToStart(t, ctx)
|
|
ctx.tr.Restart("test", "restart", false)
|
|
|
|
// Wait for it to restart then kill
|
|
go func() {
|
|
// Wait for the task to start again
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if len(ctx.upd.events) != 8 {
|
|
return false, fmt.Errorf("task %q in alloc %q should have 8 ctx.updates: %#v", task.Name, alloc.ID, ctx.upd.events)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
ctx.tr.Kill("test", "restart", false)
|
|
}()
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 10 {
|
|
t.Fatalf("should have 10 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskRestartSignal {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestartSignal)
|
|
}
|
|
|
|
if ctx.upd.events[4].Type != structs.TaskKilling {
|
|
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if ctx.upd.events[5].Type != structs.TaskKilled {
|
|
t.Fatalf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskKilled)
|
|
}
|
|
|
|
if ctx.upd.events[6].Type != structs.TaskRestarting {
|
|
t.Fatalf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
if ctx.upd.events[7].Type != structs.TaskStarted {
|
|
t.Fatalf("Eighth Event was %v; want %v", ctx.upd.events[8].Type, structs.TaskStarted)
|
|
}
|
|
if ctx.upd.events[8].Type != structs.TaskKilling {
|
|
t.Fatalf("Ninth Event was %v; want %v", ctx.upd.events[8].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if ctx.upd.events[9].Type != structs.TaskKilled {
|
|
t.Fatalf("Tenth Event was %v; want %v", ctx.upd.events[9].Type, structs.TaskKilled)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_KillTask(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
go func() {
|
|
testWaitForTaskToStart(t, ctx)
|
|
ctx.tr.Kill("test", "kill", true)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 5 {
|
|
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if !ctx.upd.failed {
|
|
t.Fatalf("TaskState should be failed: %+v", ctx.upd)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskKilling {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if ctx.upd.events[4].Type != structs.TaskKilled {
|
|
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilled)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_SignalFailure(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
"signal_error": "test forcing failure",
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Wait for the task to start
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
if err := ctx.tr.Signal("test", "test", syscall.SIGINT); err == nil {
|
|
t.Fatalf("Didn't receive error")
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_BlockForVault(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
defer ctx.Cleanup()
|
|
|
|
// Control when we get a Vault token
|
|
token := "1234"
|
|
waitCh := make(chan struct{})
|
|
handler := func(*structs.Allocation, []string) (map[string]string, error) {
|
|
<-waitCh
|
|
return map[string]string{task.Name: token}, nil
|
|
}
|
|
ctx.tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
|
|
|
|
go ctx.tr.Run()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
t.Fatalf("premature exit")
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
if len(ctx.upd.events) != 2 {
|
|
t.Fatalf("should have 2 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStatePending {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStatePending)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
// Unblock
|
|
close(waitCh)
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 4 {
|
|
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that the token is on disk
|
|
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
|
|
data, err := ioutil.ReadFile(tokenPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file: %v", err)
|
|
}
|
|
|
|
if act := string(data); act != token {
|
|
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
|
|
}
|
|
|
|
// Check the token was revoked
|
|
m := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if len(m.StoppedTokens) != 1 {
|
|
return false, fmt.Errorf("Expected a stopped token: %v", m.StoppedTokens)
|
|
}
|
|
|
|
if a := m.StoppedTokens[0]; a != token {
|
|
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_DeriveToken_Retry(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
defer ctx.Cleanup()
|
|
|
|
// Control when we get a Vault token
|
|
token := "1234"
|
|
count := 0
|
|
handler := func(*structs.Allocation, []string) (map[string]string, error) {
|
|
if count > 0 {
|
|
return map[string]string{task.Name: token}, nil
|
|
}
|
|
|
|
count++
|
|
return nil, structs.NewRecoverableError(fmt.Errorf("Want a retry"), true)
|
|
}
|
|
ctx.tr.vaultClient.(*vaultclient.MockVaultClient).DeriveTokenFn = handler
|
|
go ctx.tr.Run()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 4 {
|
|
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that the token is on disk
|
|
tokenPath := filepath.Join(ctx.tr.taskDir.SecretsDir, vaultTokenFile)
|
|
data, err := ioutil.ReadFile(tokenPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file: %v", err)
|
|
}
|
|
|
|
if act := string(data); act != token {
|
|
t.Fatalf("Token didn't get written to disk properly, got %q; want %q", act, token)
|
|
}
|
|
|
|
// Check the token was revoked
|
|
m := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if len(m.StoppedTokens) != 1 {
|
|
return false, fmt.Errorf("Expected a stopped token: %v", m.StoppedTokens)
|
|
}
|
|
|
|
if a := m.StoppedTokens[0]; a != token {
|
|
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_DeriveToken_Unrecoverable(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
task.Vault = &structs.Vault{
|
|
Policies: []string{"default"},
|
|
ChangeMode: structs.VaultChangeModeRestart,
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
defer ctx.Cleanup()
|
|
|
|
// Error the token derivation
|
|
vc := ctx.tr.vaultClient.(*vaultclient.MockVaultClient)
|
|
vc.SetDeriveTokenError(alloc.ID, []string{task.Name}, fmt.Errorf("Non recoverable"))
|
|
go ctx.tr.Run()
|
|
|
|
// Wait for the task to start
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(ctx.upd.events); l != 3 {
|
|
return false, fmt.Errorf("Expect 3 events; got %v", l)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskKilling {
|
|
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskKilling)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_Template_Block(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Templates = []*structs.Template{
|
|
{
|
|
EmbeddedTmpl: "{{key \"foo\"}}",
|
|
DestPath: "local/test",
|
|
ChangeMode: structs.TemplateChangeModeNoop,
|
|
},
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
t.Fatalf("premature exit")
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
if len(ctx.upd.events) != 2 {
|
|
t.Fatalf("should have 2 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStatePending {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStatePending)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
// Unblock
|
|
ctx.tr.UnblockStart("test")
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 4 {
|
|
t.Fatalf("should have 4 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Template_Artifact(t *testing.T) {
|
|
t.Parallel()
|
|
dir, err := os.Getwd()
|
|
if err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Join(dir, "../../.."))))
|
|
defer ts.Close()
|
|
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
// Create an allocation that has a task that renders a template from an
|
|
// artifact
|
|
f1 := "CHANGELOG.md"
|
|
artifact := structs.TaskArtifact{
|
|
GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1),
|
|
}
|
|
task.Artifacts = []*structs.TaskArtifact{&artifact}
|
|
task.Templates = []*structs.Template{
|
|
{
|
|
SourcePath: "CHANGELOG.md",
|
|
DestPath: "local/test",
|
|
ChangeMode: structs.TemplateChangeModeNoop,
|
|
},
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
defer ctx.Cleanup()
|
|
go ctx.tr.Run()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 5 {
|
|
t.Fatalf("should have 5 ctx.updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskDownloadingArtifacts {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskDownloadingArtifacts)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskStarted {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[4].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that both files exist.
|
|
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.Dir, f1)); err != nil {
|
|
t.Fatalf("%v not downloaded", f1)
|
|
}
|
|
if _, err := os.Stat(filepath.Join(ctx.tr.taskDir.LocalDir, "test")); err != nil {
|
|
t.Fatalf("template not rendered")
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Template_NewVaultToken(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
task.Templates = []*structs.Template{
|
|
{
|
|
EmbeddedTmpl: "{{key \"foo\"}}",
|
|
DestPath: "local/test",
|
|
ChangeMode: structs.TemplateChangeModeNoop,
|
|
},
|
|
}
|
|
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
defer ctx.Cleanup()
|
|
go ctx.tr.Run()
|
|
|
|
// Wait for a Vault token
|
|
var token string
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if token = ctx.tr.vaultFuture.Get(); token == "" {
|
|
return false, fmt.Errorf("No Vault token")
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Error the token renewal
|
|
renewalCh, ok := ctx.vault.RenewTokens[token]
|
|
if !ok {
|
|
t.Fatalf("no renewal channel")
|
|
}
|
|
|
|
originalManager := ctx.tr.templateManager
|
|
|
|
renewalCh <- fmt.Errorf("Test killing")
|
|
close(renewalCh)
|
|
|
|
// Wait for a new Vault token
|
|
var token2 string
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if token2 = ctx.tr.vaultFuture.Get(); token2 == "" || token2 == token {
|
|
return false, fmt.Errorf("No new Vault token")
|
|
}
|
|
|
|
if originalManager == ctx.tr.templateManager {
|
|
return false, fmt.Errorf("Template manager not ctx.updated")
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Check the token was revoked
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if len(ctx.vault.StoppedTokens) != 1 {
|
|
return false, fmt.Errorf("Expected a stopped token: %v", ctx.vault.StoppedTokens)
|
|
}
|
|
|
|
if a := ctx.vault.StoppedTokens[0]; a != token {
|
|
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_VaultManager_Restart(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
task.Vault = &structs.Vault{
|
|
Policies: []string{"default"},
|
|
ChangeMode: structs.VaultChangeModeRestart,
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
defer ctx.Cleanup()
|
|
go ctx.tr.Run()
|
|
|
|
// Wait for the task to start
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
// Error the token renewal
|
|
renewalCh, ok := ctx.vault.RenewTokens[ctx.tr.vaultFuture.Get()]
|
|
if !ok {
|
|
t.Fatalf("no renewal channel")
|
|
}
|
|
|
|
renewalCh <- fmt.Errorf("Test killing")
|
|
close(renewalCh)
|
|
|
|
// Ensure a restart
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(ctx.upd.events); l != 8 {
|
|
return false, fmt.Errorf("Expect eight events; got %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskRestartSignal {
|
|
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskRestartSignal)
|
|
}
|
|
|
|
if ctx.upd.events[4].Type != structs.TaskKilling {
|
|
return false, fmt.Errorf("Fifth Event was %v; want %v", ctx.upd.events[4].Type, structs.TaskKilling)
|
|
}
|
|
|
|
if ctx.upd.events[5].Type != structs.TaskKilled {
|
|
return false, fmt.Errorf("Sixth Event was %v; want %v", ctx.upd.events[5].Type, structs.TaskKilled)
|
|
}
|
|
|
|
if ctx.upd.events[6].Type != structs.TaskRestarting {
|
|
return false, fmt.Errorf("Seventh Event was %v; want %v", ctx.upd.events[6].Type, structs.TaskRestarting)
|
|
}
|
|
|
|
if ctx.upd.events[7].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Eight Event was %v; want %v", ctx.upd.events[7].Type, structs.TaskStarted)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestTaskRunner_VaultManager_Signal(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "10s",
|
|
}
|
|
task.Vault = &structs.Vault{
|
|
Policies: []string{"default"},
|
|
ChangeMode: structs.VaultChangeModeSignal,
|
|
ChangeSignal: "SIGUSR1",
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Wait for the task to start
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
// Error the token renewal
|
|
renewalCh, ok := ctx.vault.RenewTokens[ctx.tr.vaultFuture.Get()]
|
|
if !ok {
|
|
t.Fatalf("no renewal channel")
|
|
}
|
|
|
|
renewalCh <- fmt.Errorf("Test killing")
|
|
close(renewalCh)
|
|
|
|
// Ensure a restart
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
if l := len(ctx.upd.events); l != 4 {
|
|
return false, fmt.Errorf("Expect four events; got %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
return false, fmt.Errorf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
return false, fmt.Errorf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
return false, fmt.Errorf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskSignaling {
|
|
return false, fmt.Errorf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskSignaling)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
// Test that the payload is written to disk
|
|
func TestTaskRunner_SimpleRun_Dispatch(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "1s",
|
|
}
|
|
fileName := "test"
|
|
task.DispatchPayload = &structs.DispatchPayloadConfig{
|
|
File: fileName,
|
|
}
|
|
alloc.Job.ParameterizedJob = &structs.ParameterizedJobConfig{}
|
|
|
|
// Add an encrypted payload
|
|
expected := []byte("hello world")
|
|
compressed := snappy.Encode(nil, expected)
|
|
alloc.Job.Payload = compressed
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
defer ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
defer ctx.allocDir.Destroy()
|
|
go ctx.tr.Run()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
if len(ctx.upd.events) != 4 {
|
|
t.Fatalf("should have 4 updates: %#v", ctx.upd.events)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if ctx.upd.events[0].Type != structs.TaskReceived {
|
|
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
|
|
}
|
|
|
|
if ctx.upd.events[1].Type != structs.TaskSetup {
|
|
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
|
|
}
|
|
|
|
if ctx.upd.events[2].Type != structs.TaskStarted {
|
|
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
|
|
}
|
|
|
|
if ctx.upd.events[3].Type != structs.TaskTerminated {
|
|
t.Fatalf("Fourth Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
|
|
}
|
|
|
|
// Check that the file was written to disk properly
|
|
payloadPath := filepath.Join(ctx.tr.taskDir.LocalDir, fileName)
|
|
data, err := ioutil.ReadFile(payloadPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file: %v", err)
|
|
}
|
|
if !reflect.DeepEqual(data, expected) {
|
|
t.Fatalf("Bad; got %v; want %v", string(data), string(expected))
|
|
}
|
|
}
|
|
|
|
// TestTaskRunner_CleanupEmpty ensures TaskRunner works when createdResources
|
|
// is empty.
|
|
func TestTaskRunner_CleanupEmpty(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
|
|
defer ctx.Cleanup()
|
|
ctx.tr.Run()
|
|
|
|
// Since we only failed once, createdResources should be empty
|
|
if len(ctx.tr.createdResources.Resources) != 0 {
|
|
t.Fatalf("createdResources should still be empty: %v", ctx.tr.createdResources)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_CleanupOK(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
key := "ERR"
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.config.Options = map[string]string{
|
|
"cleanup_fail_on": key,
|
|
"cleanup_fail_num": "1",
|
|
}
|
|
ctx.tr.MarkReceived()
|
|
|
|
ctx.tr.createdResources.Resources[key] = []string{"x", "y"}
|
|
ctx.tr.createdResources.Resources["foo"] = []string{"z"}
|
|
|
|
defer ctx.Cleanup()
|
|
ctx.tr.Run()
|
|
|
|
// Since we only failed once, createdResources should be empty
|
|
if len(ctx.tr.createdResources.Resources) > 0 {
|
|
t.Fatalf("expected all created resources to be removed: %#v", ctx.tr.createdResources.Resources)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_CleanupFail(t *testing.T) {
|
|
t.Parallel()
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
key := "ERR"
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.config.Options = map[string]string{
|
|
"cleanup_fail_on": key,
|
|
"cleanup_fail_num": "5",
|
|
}
|
|
ctx.tr.MarkReceived()
|
|
|
|
ctx.tr.createdResources.Resources[key] = []string{"x"}
|
|
ctx.tr.createdResources.Resources["foo"] = []string{"y", "z"}
|
|
|
|
defer ctx.Cleanup()
|
|
ctx.tr.Run()
|
|
|
|
// Since we failed > 3 times, the failed key should remain
|
|
expected := map[string][]string{key: {"x"}}
|
|
if !reflect.DeepEqual(expected, ctx.tr.createdResources.Resources) {
|
|
t.Fatalf("expected %#v but found: %#v", expected, ctx.tr.createdResources.Resources)
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_Pre06ScriptCheck(t *testing.T) {
|
|
t.Parallel()
|
|
run := func(ver, driver, checkType string, exp bool) (string, func(t *testing.T)) {
|
|
name := fmt.Sprintf("%s %s %s returns %t", ver, driver, checkType, exp)
|
|
return name, func(t *testing.T) {
|
|
services := []*structs.Service{
|
|
{
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Type: checkType,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
if act := pre06ScriptCheck(ver, driver, services); act != exp {
|
|
t.Errorf("expected %t received %t", exp, act)
|
|
}
|
|
}
|
|
}
|
|
t.Run(run("0.5.6", "exec", "script", true))
|
|
t.Run(run("0.5.6", "java", "script", true))
|
|
t.Run(run("0.5.6", "mock_driver", "script", true))
|
|
t.Run(run("0.5.9", "exec", "script", true))
|
|
t.Run(run("0.5.9", "java", "script", true))
|
|
t.Run(run("0.5.9", "mock_driver", "script", true))
|
|
|
|
t.Run(run("0.6.0dev", "exec", "script", false))
|
|
t.Run(run("0.6.0dev", "java", "script", false))
|
|
t.Run(run("0.6.0dev", "mock_driver", "script", false))
|
|
t.Run(run("0.6.0", "exec", "script", false))
|
|
t.Run(run("0.6.0", "java", "script", false))
|
|
t.Run(run("0.6.0", "mock_driver", "script", false))
|
|
t.Run(run("1.0.0", "exec", "script", false))
|
|
t.Run(run("1.0.0", "java", "script", false))
|
|
t.Run(run("1.0.0", "mock_driver", "script", false))
|
|
|
|
t.Run(run("0.5.6", "rkt", "script", false))
|
|
t.Run(run("0.5.6", "docker", "script", false))
|
|
t.Run(run("0.5.6", "qemu", "script", false))
|
|
t.Run(run("0.5.6", "raw_exec", "script", false))
|
|
t.Run(run("0.5.6", "invalid", "script", false))
|
|
|
|
t.Run(run("0.5.6", "exec", "tcp", false))
|
|
t.Run(run("0.5.6", "java", "tcp", false))
|
|
t.Run(run("0.5.6", "mock_driver", "tcp", false))
|
|
}
|
|
|
|
func TestTaskRunner_interpolateServices(t *testing.T) {
|
|
t.Parallel()
|
|
task := &structs.Task{
|
|
Services: []*structs.Service{
|
|
{
|
|
Name: "${name}",
|
|
PortLabel: "${portlabel}",
|
|
Tags: []string{"${tags}"},
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "${checkname}",
|
|
Type: "${checktype}",
|
|
Command: "${checkcmd}",
|
|
Args: []string{"${checkarg}"},
|
|
Path: "${checkstr}",
|
|
Protocol: "${checkproto}",
|
|
PortLabel: "${checklabel}",
|
|
InitialStatus: "${checkstatus}",
|
|
Method: "${checkmethod}",
|
|
Header: map[string][]string{
|
|
"${checkheaderk}": {"${checkheaderv}"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
env := &env.TaskEnv{
|
|
EnvMap: map[string]string{
|
|
"name": "name",
|
|
"portlabel": "portlabel",
|
|
"tags": "tags",
|
|
"checkname": "checkname",
|
|
"checktype": "checktype",
|
|
"checkcmd": "checkcmd",
|
|
"checkarg": "checkarg",
|
|
"checkstr": "checkstr",
|
|
"checkpath": "checkpath",
|
|
"checkproto": "checkproto",
|
|
"checklabel": "checklabel",
|
|
"checkstatus": "checkstatus",
|
|
"checkmethod": "checkmethod",
|
|
"checkheaderk": "checkheaderk",
|
|
"checkheaderv": "checkheaderv",
|
|
},
|
|
}
|
|
|
|
interpTask := interpolateServices(env, task)
|
|
|
|
exp := &structs.Task{
|
|
Services: []*structs.Service{
|
|
{
|
|
Name: "name",
|
|
PortLabel: "portlabel",
|
|
Tags: []string{"tags"},
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "checkname",
|
|
Type: "checktype",
|
|
Command: "checkcmd",
|
|
Args: []string{"checkarg"},
|
|
Path: "checkstr",
|
|
Protocol: "checkproto",
|
|
PortLabel: "checklabel",
|
|
InitialStatus: "checkstatus",
|
|
Method: "checkmethod",
|
|
Header: map[string][]string{
|
|
"checkheaderk": {"checkheaderv"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if diff := pretty.Diff(interpTask, exp); len(diff) > 0 {
|
|
t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n"))
|
|
}
|
|
}
|
|
|
|
func TestTaskRunner_ShutdownDelay(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Services[0].Tags = []string{"tag1"}
|
|
task.Services = task.Services[:1] // only need 1 for this test
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"run_for": "1000s",
|
|
}
|
|
|
|
// No shutdown escape hatch for this delay, so don't set it too high
|
|
task.ShutdownDelay = 500 * time.Duration(testutil.TestMultiplier()) * time.Millisecond
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Wait for the task to start
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
services, _ := ctx.consul.Services()
|
|
if n := len(services); n != 1 {
|
|
return false, fmt.Errorf("expected 1 service found %d", n)
|
|
}
|
|
for _, s := range services {
|
|
if !reflect.DeepEqual(s.Tags, task.Services[0].Tags) {
|
|
return false, fmt.Errorf("expected tags=%q but found %q",
|
|
strings.Join(task.Services[0].Tags, ","), strings.Join(s.Tags, ","))
|
|
}
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
services, _ := ctx.consul.Services()
|
|
for _, s := range services {
|
|
t.Logf("Service: %#v", s)
|
|
}
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Begin the tear down
|
|
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
|
|
destroyed := time.Now()
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
services, _ := ctx.consul.Services()
|
|
if n := len(services); n == 1 {
|
|
return false, fmt.Errorf("expected 0 services found %d", n)
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Wait for actual exit
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
// It should be impossible to reach here in less time than the shutdown delay
|
|
if time.Now().Before(destroyed.Add(task.ShutdownDelay)) {
|
|
t.Fatalf("task exited before shutdown delay")
|
|
}
|
|
}
|
|
|
|
// TestTaskRunner_CheckWatcher_Restart asserts that when enabled an unhealthy
|
|
// Consul check will cause a task to restart following restart policy rules.
|
|
func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
alloc := mock.Alloc()
|
|
|
|
// Make the restart policy fail within this test
|
|
tg := alloc.Job.TaskGroups[0]
|
|
tg.RestartPolicy.Attempts = 2
|
|
tg.RestartPolicy.Interval = 1 * time.Minute
|
|
tg.RestartPolicy.Delay = 10 * time.Millisecond
|
|
tg.RestartPolicy.Mode = structs.RestartPolicyModeFail
|
|
|
|
task := tg.Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": "0",
|
|
"run_for": "100s",
|
|
}
|
|
|
|
// Make the task register a check that fails
|
|
task.Services[0].Checks[0] = &structs.ServiceCheck{
|
|
Name: "test-restarts",
|
|
Type: structs.ServiceCheckTCP,
|
|
Interval: 50 * time.Millisecond,
|
|
CheckRestart: &structs.CheckRestart{
|
|
Limit: 2,
|
|
Grace: 100 * time.Millisecond,
|
|
},
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, true, alloc)
|
|
|
|
// Replace mock Consul ServiceClient, with the real ServiceClient
|
|
// backed by a mock consul whose checks are always unhealthy.
|
|
consulAgent := consul.NewMockAgent()
|
|
consulAgent.SetStatus("critical")
|
|
consulClient := consul.NewServiceClient(consulAgent, testlog.HCLogger(t), true)
|
|
go consulClient.Run()
|
|
defer consulClient.Shutdown()
|
|
|
|
ctx.tr.consul = consulClient
|
|
ctx.consul = nil // prevent accidental use of old mock
|
|
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
select {
|
|
case <-ctx.tr.WaitCh():
|
|
case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
|
|
expected := []string{
|
|
"Received",
|
|
"Task Setup",
|
|
"Started",
|
|
"Restart Signaled",
|
|
"Killing",
|
|
"Killed",
|
|
"Restarting",
|
|
"Started",
|
|
"Restart Signaled",
|
|
"Killing",
|
|
"Killed",
|
|
"Restarting",
|
|
"Started",
|
|
"Restart Signaled",
|
|
"Killing",
|
|
"Killed",
|
|
"Not Restarting",
|
|
}
|
|
|
|
if n := len(ctx.upd.events); n != len(expected) {
|
|
t.Fatalf("should have %d ctx.updates found %d: %s", len(expected), n, ctx.upd)
|
|
}
|
|
|
|
if ctx.upd.state != structs.TaskStateDead {
|
|
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
|
|
}
|
|
|
|
if !ctx.upd.failed {
|
|
t.Fatalf("expected failed")
|
|
}
|
|
|
|
for i, actual := range ctx.upd.events {
|
|
if actual.Type != expected[i] {
|
|
t.Errorf("%.2d - Expected %q but found %q", i, expected[i], actual.Type)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestTaskRunner_DriverNetwork asserts that a driver's network is properly
|
|
// used in services and checks.
|
|
func TestTaskRunner_DriverNetwork(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
alloc := mock.Alloc()
|
|
task := alloc.Job.TaskGroups[0].Tasks[0]
|
|
task.Driver = "mock_driver"
|
|
task.Config = map[string]interface{}{
|
|
"exit_code": 0,
|
|
"run_for": "100s",
|
|
"driver_ip": "10.1.2.3",
|
|
"driver_port_map": "http:80",
|
|
}
|
|
|
|
// Create services and checks with custom address modes to exercise
|
|
// address detection logic
|
|
task.Services = []*structs.Service{
|
|
{
|
|
Name: "host-service",
|
|
PortLabel: "http",
|
|
AddressMode: "host",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "driver-check",
|
|
Type: "tcp",
|
|
PortLabel: "1234",
|
|
AddressMode: "driver",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "driver-service",
|
|
PortLabel: "5678",
|
|
AddressMode: "driver",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "host-check",
|
|
Type: "tcp",
|
|
PortLabel: "http",
|
|
},
|
|
{
|
|
Name: "driver-label-check",
|
|
Type: "tcp",
|
|
PortLabel: "http",
|
|
AddressMode: "driver",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
ctx := testTaskRunnerFromAlloc(t, false, alloc)
|
|
ctx.tr.MarkReceived()
|
|
go ctx.tr.Run()
|
|
defer ctx.Cleanup()
|
|
|
|
// Wait for the task to start
|
|
testWaitForTaskToStart(t, ctx)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
services, _ := ctx.consul.Services()
|
|
if n := len(services); n != 2 {
|
|
return false, fmt.Errorf("expected 2 services, but found %d", n)
|
|
}
|
|
for _, s := range services {
|
|
switch s.Service {
|
|
case "host-service":
|
|
if expected := "192.168.0.100"; s.Address != expected {
|
|
return false, fmt.Errorf("expected host-service to have IP=%s but found %s",
|
|
expected, s.Address)
|
|
}
|
|
case "driver-service":
|
|
if expected := "10.1.2.3"; s.Address != expected {
|
|
return false, fmt.Errorf("expected driver-service to have IP=%s but found %s",
|
|
expected, s.Address)
|
|
}
|
|
if expected := 5678; s.Port != expected {
|
|
return false, fmt.Errorf("expected driver-service to have port=%d but found %d",
|
|
expected, s.Port)
|
|
}
|
|
default:
|
|
return false, fmt.Errorf("unexpected service: %q", s.Service)
|
|
}
|
|
|
|
}
|
|
|
|
checks := ctx.consul.CheckRegs()
|
|
if n := len(checks); n != 3 {
|
|
return false, fmt.Errorf("expected 3 checks, but found %d", n)
|
|
}
|
|
for _, check := range checks {
|
|
switch check.Name {
|
|
case "driver-check":
|
|
if expected := "10.1.2.3:1234"; check.TCP != expected {
|
|
return false, fmt.Errorf("expected driver-check to have address %q but found %q", expected, check.TCP)
|
|
}
|
|
case "driver-label-check":
|
|
if expected := "10.1.2.3:80"; check.TCP != expected {
|
|
return false, fmt.Errorf("expected driver-label-check to have address %q but found %q", expected, check.TCP)
|
|
}
|
|
case "host-check":
|
|
if expected := "192.168.0.100:"; !strings.HasPrefix(check.TCP, expected) {
|
|
return false, fmt.Errorf("expected host-check to have address start with %q but found %q", expected, check.TCP)
|
|
}
|
|
default:
|
|
return false, fmt.Errorf("unexpected check: %q", check.Name)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
services, _ := ctx.consul.Services()
|
|
for _, s := range services {
|
|
t.Logf(pretty.Sprint("Service: ", s))
|
|
}
|
|
for _, c := range ctx.consul.CheckRegs() {
|
|
t.Logf(pretty.Sprint("Check: ", c))
|
|
}
|
|
t.Fatalf("error: %v", err)
|
|
})
|
|
}
|