Merge pull request #5349 from hashicorp/port-tests-20190221
Port some 0.8.7 alloc runner tests
This commit is contained in:
commit
7c9b40a6f1
|
@ -2,12 +2,15 @@ package allocrunner
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/nomad/client/allochealth"
|
||||
"github.com/hashicorp/nomad/client/allocwatcher"
|
||||
cconsul "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
|
@ -625,3 +628,152 @@ func TestAllocRunner_Destroy(t *testing.T) {
|
|||
require.Failf(t, "expected NotExist error", "found %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocRunner_SimpleRun(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
|
||||
conf, cleanup := testAllocRunnerConfig(t, alloc)
|
||||
defer cleanup()
|
||||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
go ar.Run()
|
||||
defer destroy(ar)
|
||||
|
||||
// Wait for alloc to be running
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
state := ar.AllocState()
|
||||
|
||||
if state.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("got status %v; want %v", state.ClientStatus, structs.AllocClientStatusComplete)
|
||||
}
|
||||
|
||||
for t, s := range state.TaskStates {
|
||||
if s.FinishedAt.IsZero() {
|
||||
return false, fmt.Errorf("task %q has zero FinishedAt value", t)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// TestAllocRunner_MoveAllocDir asserts that a rescheduled
|
||||
// allocation copies ephemeral disk content from previous alloc run
|
||||
func TestAllocRunner_MoveAllocDir(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Step 1: start and run a task
|
||||
alloc := mock.BatchAlloc()
|
||||
conf, cleanup := testAllocRunnerConfig(t, alloc)
|
||||
defer cleanup()
|
||||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
ar.Run()
|
||||
defer destroy(ar)
|
||||
|
||||
require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus)
|
||||
|
||||
// Step 2. Modify its directory
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
dataFile := filepath.Join(ar.allocDir.SharedDir, "data", "data_file")
|
||||
ioutil.WriteFile(dataFile, []byte("hello world"), os.ModePerm)
|
||||
taskDir := ar.allocDir.TaskDirs[task.Name]
|
||||
taskLocalFile := filepath.Join(taskDir.LocalDir, "local_file")
|
||||
ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm)
|
||||
|
||||
// Step 3. Start a new alloc
|
||||
alloc2 := mock.BatchAlloc()
|
||||
alloc2.PreviousAllocation = alloc.ID
|
||||
alloc2.Job.TaskGroups[0].EphemeralDisk.Sticky = true
|
||||
|
||||
conf2, cleanup := testAllocRunnerConfig(t, alloc2)
|
||||
conf2.PrevAllocWatcher, conf2.PrevAllocMigrator = allocwatcher.NewAllocWatcher(allocwatcher.Config{
|
||||
Alloc: alloc2,
|
||||
PreviousRunner: ar,
|
||||
Logger: conf2.Logger,
|
||||
})
|
||||
defer cleanup()
|
||||
ar2, err := NewAllocRunner(conf2)
|
||||
require.NoError(t, err)
|
||||
|
||||
ar2.Run()
|
||||
defer destroy(ar2)
|
||||
|
||||
require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus)
|
||||
|
||||
// Ensure that data from ar was moved to ar2
|
||||
dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file")
|
||||
fileInfo, _ := os.Stat(dataFile)
|
||||
require.NotNilf(t, fileInfo, "file %q not found", dataFile)
|
||||
|
||||
taskDir = ar2.allocDir.TaskDirs[task.Name]
|
||||
taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file")
|
||||
fileInfo, _ = os.Stat(taskLocalFile)
|
||||
require.NotNilf(t, fileInfo, "file %q not found", dataFile)
|
||||
|
||||
}
|
||||
|
||||
// TestAllocRuner_HandlesArtifactFailure ensures that if one task in a task group is
|
||||
// retrying fetching an artifact, other tasks in the group should be able
|
||||
// to proceed.
|
||||
func TestAllocRunner_HandlesArtifactFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
||||
Mode: structs.RestartPolicyModeFail,
|
||||
Attempts: 1,
|
||||
Delay: time.Nanosecond,
|
||||
Interval: time.Hour,
|
||||
}
|
||||
|
||||
// Create a new task with a bad artifact
|
||||
badtask := alloc.Job.TaskGroups[0].Tasks[0].Copy()
|
||||
badtask.Name = "bad"
|
||||
badtask.Artifacts = []*structs.TaskArtifact{
|
||||
{GetterSource: "http://127.0.0.1:0/foo/bar/baz"},
|
||||
}
|
||||
|
||||
alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, badtask)
|
||||
alloc.AllocatedResources.Tasks["bad"] = &structs.AllocatedTaskResources{
|
||||
Cpu: structs.AllocatedCpuResources{
|
||||
CpuShares: 500,
|
||||
},
|
||||
Memory: structs.AllocatedMemoryResources{
|
||||
MemoryMB: 256,
|
||||
},
|
||||
}
|
||||
|
||||
conf, cleanup := testAllocRunnerConfig(t, alloc)
|
||||
defer cleanup()
|
||||
ar, err := NewAllocRunner(conf)
|
||||
require.NoError(t, err)
|
||||
go ar.Run()
|
||||
defer destroy(ar)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
state := ar.AllocState()
|
||||
|
||||
switch state.ClientStatus {
|
||||
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed:
|
||||
return true, nil
|
||||
default:
|
||||
return false, fmt.Errorf("got status %v but want terminal", state.ClientStatus)
|
||||
}
|
||||
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
state := ar.AllocState()
|
||||
require.Equal(t, structs.AllocClientStatusFailed, state.ClientStatus)
|
||||
require.Equal(t, structs.TaskStateDead, state.TaskStates["web"].State)
|
||||
require.True(t, state.TaskStates["web"].Successful())
|
||||
require.Equal(t, structs.TaskStateDead, state.TaskStates["bad"].State)
|
||||
require.True(t, state.TaskStates["bad"].Failed)
|
||||
}
|
||||
|
|
|
@ -1238,6 +1238,323 @@ func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
|
|||
require.Equal(t, structs.TaskNotRestarting, state.Events[5].Type)
|
||||
}
|
||||
|
||||
// TestTaskRunner_Template_Artifact asserts that tasks can use artifacts as templates.
|
||||
func TestTaskRunner_Template_Artifact(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ts := httptest.NewServer(http.FileServer(http.Dir(".")))
|
||||
defer ts.Close()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
f1 := "task_runner.go"
|
||||
f2 := "test"
|
||||
task.Artifacts = []*structs.TaskArtifact{
|
||||
{GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1)},
|
||||
}
|
||||
task.Templates = []*structs.Template{
|
||||
{
|
||||
SourcePath: f1,
|
||||
DestPath: "local/test",
|
||||
ChangeMode: structs.TemplateChangeModeNoop,
|
||||
},
|
||||
}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
go tr.Run()
|
||||
|
||||
// Wait for task to run and exit
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())):
|
||||
require.Fail(t, "timed out waiting for task runner to exit")
|
||||
}
|
||||
|
||||
state := tr.TaskState()
|
||||
require.Equal(t, structs.TaskStateDead, state.State)
|
||||
require.True(t, state.Successful())
|
||||
require.False(t, state.Failed)
|
||||
|
||||
artifactsDownloaded := false
|
||||
for _, e := range state.Events {
|
||||
if e.Type == structs.TaskDownloadingArtifacts {
|
||||
artifactsDownloaded = true
|
||||
}
|
||||
}
|
||||
assert.True(t, artifactsDownloaded, "expected artifacts downloaded events")
|
||||
|
||||
// Check that both files exist.
|
||||
_, err = os.Stat(filepath.Join(conf.TaskDir.Dir, f1))
|
||||
require.NoErrorf(t, err, "%v not downloaded", f1)
|
||||
|
||||
_, err = os.Stat(filepath.Join(conf.TaskDir.LocalDir, f2))
|
||||
require.NoErrorf(t, err, "%v not rendered", f2)
|
||||
}
|
||||
|
||||
// TestTaskRunner_Template_NewVaultToken asserts that a new vault token is
|
||||
// created when rendering template and that it is revoked on alloc completion
|
||||
func TestTaskRunner_Template_NewVaultToken(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Templates = []*structs.Template{
|
||||
{
|
||||
EmbeddedTmpl: `{{key "foo"}}`,
|
||||
DestPath: "local/test",
|
||||
ChangeMode: structs.TemplateChangeModeNoop,
|
||||
},
|
||||
}
|
||||
task.Vault = &structs.Vault{Policies: []string{"default"}}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
go tr.Run()
|
||||
|
||||
// Wait for a Vault token
|
||||
var token string
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
token = tr.getVaultToken()
|
||||
|
||||
if token == "" {
|
||||
return false, fmt.Errorf("No Vault token")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
vault := conf.Vault.(*vaultclient.MockVaultClient)
|
||||
renewalCh, ok := vault.RenewTokens()[token]
|
||||
require.True(t, ok, "no renewal channel for token")
|
||||
|
||||
renewalCh <- fmt.Errorf("Test killing")
|
||||
close(renewalCh)
|
||||
|
||||
var token2 string
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
token2 = tr.getVaultToken()
|
||||
|
||||
if token2 == "" {
|
||||
return false, fmt.Errorf("No Vault token")
|
||||
}
|
||||
|
||||
if token2 == token {
|
||||
return false, fmt.Errorf("token wasn't recreated")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
// Check the token was revoked
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if len(vault.StoppedTokens()) != 1 {
|
||||
return false, fmt.Errorf("Expected a stopped token: %v", vault.StoppedTokens())
|
||||
}
|
||||
|
||||
if a := vault.StoppedTokens()[0]; a != token {
|
||||
return false, fmt.Errorf("got stopped token %q; want %q", a, token)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// TestTaskRunner_VaultManager_Restart asserts that the alloc is restarted when the alloc
|
||||
// derived vault token expires, when task is configured with Restart change mode
|
||||
func TestTaskRunner_VaultManager_Restart(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "10s",
|
||||
}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
ChangeMode: structs.VaultChangeModeRestart,
|
||||
}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
go tr.Run()
|
||||
|
||||
testWaitForTaskToStart(t, tr)
|
||||
|
||||
tr.vaultTokenLock.Lock()
|
||||
token := tr.vaultToken
|
||||
tr.vaultTokenLock.Unlock()
|
||||
|
||||
require.NotEmpty(t, token)
|
||||
|
||||
vault := conf.Vault.(*vaultclient.MockVaultClient)
|
||||
renewalCh, ok := vault.RenewTokens()[token]
|
||||
require.True(t, ok, "no renewal channel for token")
|
||||
|
||||
renewalCh <- fmt.Errorf("Test killing")
|
||||
close(renewalCh)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
state := tr.TaskState()
|
||||
|
||||
if len(state.Events) == 0 {
|
||||
return false, fmt.Errorf("no events yet")
|
||||
}
|
||||
|
||||
foundRestartSignal, foundRestarting := false, false
|
||||
for _, e := range state.Events {
|
||||
switch e.Type {
|
||||
case structs.TaskRestartSignal:
|
||||
foundRestartSignal = true
|
||||
case structs.TaskRestarting:
|
||||
foundRestarting = true
|
||||
}
|
||||
}
|
||||
|
||||
if !foundRestartSignal {
|
||||
return false, fmt.Errorf("no restart signal event yet: %#v", state.Events)
|
||||
}
|
||||
|
||||
if !foundRestarting {
|
||||
return false, fmt.Errorf("no restarting event yet: %#v", state.Events)
|
||||
}
|
||||
|
||||
lastEvent := state.Events[len(state.Events)-1]
|
||||
if lastEvent.Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("expected last event to be task starting but was %#v", lastEvent)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// TestTaskRunner_VaultManager_Signal asserts that the alloc is signalled when the alloc
|
||||
// derived vault token expires, when task is configured with signal change mode
|
||||
func TestTaskRunner_VaultManager_Signal(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.BatchAlloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "10s",
|
||||
}
|
||||
task.Vault = &structs.Vault{
|
||||
Policies: []string{"default"},
|
||||
ChangeMode: structs.VaultChangeModeSignal,
|
||||
ChangeSignal: "SIGUSR1",
|
||||
}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
go tr.Run()
|
||||
|
||||
testWaitForTaskToStart(t, tr)
|
||||
|
||||
tr.vaultTokenLock.Lock()
|
||||
token := tr.vaultToken
|
||||
tr.vaultTokenLock.Unlock()
|
||||
|
||||
require.NotEmpty(t, token)
|
||||
|
||||
vault := conf.Vault.(*vaultclient.MockVaultClient)
|
||||
renewalCh, ok := vault.RenewTokens()[token]
|
||||
require.True(t, ok, "no renewal channel for token")
|
||||
|
||||
renewalCh <- fmt.Errorf("Test killing")
|
||||
close(renewalCh)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
state := tr.TaskState()
|
||||
|
||||
if len(state.Events) == 0 {
|
||||
return false, fmt.Errorf("no events yet")
|
||||
}
|
||||
|
||||
foundSignaling := false
|
||||
for _, e := range state.Events {
|
||||
if e.Type == structs.TaskSignaling {
|
||||
foundSignaling = true
|
||||
}
|
||||
}
|
||||
|
||||
if !foundSignaling {
|
||||
return false, fmt.Errorf("no signaling event yet: %#v", state.Events)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// TestTaskRunner_UnregisterConsul_Retries asserts a task is unregistered from
|
||||
// Consul when waiting to be retried.
|
||||
func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
// Make the restart policy try one ctx.update
|
||||
alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{
|
||||
Attempts: 1,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: time.Nanosecond,
|
||||
Mode: structs.RestartPolicyModeFail,
|
||||
}
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "1",
|
||||
"run_for": "1ns",
|
||||
}
|
||||
|
||||
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
|
||||
defer cleanup()
|
||||
|
||||
tr, err := NewTaskRunner(conf)
|
||||
require.NoError(t, err)
|
||||
defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
|
||||
tr.Run()
|
||||
|
||||
state := tr.TaskState()
|
||||
require.Equal(t, structs.TaskStateDead, state.State)
|
||||
|
||||
consul := conf.Consul.(*consulapi.MockConsulServiceClient)
|
||||
consulOps := consul.GetOps()
|
||||
require.Len(t, consulOps, 6)
|
||||
// pattern: add followed by two removals
|
||||
require.Equal(t, "add", consulOps[0].Op)
|
||||
require.Equal(t, "remove", consulOps[1].Op)
|
||||
require.Equal(t, "remove", consulOps[2].Op)
|
||||
require.Equal(t, "add", consulOps[3].Op)
|
||||
require.Equal(t, "remove", consulOps[4].Op)
|
||||
require.Equal(t, "remove", consulOps[5].Op)
|
||||
}
|
||||
|
||||
// testWaitForTaskToStart waits for the task to be running or fails the test
|
||||
func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
|
|
|
@ -254,7 +254,7 @@ OUTER:
|
|||
case structs.VaultChangeModeRestart:
|
||||
const noFailure = false
|
||||
h.lifecycle.Restart(h.ctx,
|
||||
structs.NewTaskEvent(structs.TaskRestarting).
|
||||
structs.NewTaskEvent(structs.TaskRestartSignal).
|
||||
SetDisplayMessage("Vault: new Vault token acquired"), false)
|
||||
case structs.VaultChangeModeNoop:
|
||||
fallthrough
|
||||
|
|
Loading…
Reference in a new issue