Fix lints
This commit is contained in:
parent
89dafaaea9
commit
7946a14aa8
|
@ -76,7 +76,7 @@ func (ar *allocRunner) initRunnerHooks() {
|
|||
hs := &allocHealthSetter{ar}
|
||||
|
||||
// Create the alloc directory hook. This is run first to ensure the
|
||||
// directoy path exists for other hooks.
|
||||
// directory path exists for other hooks.
|
||||
ar.runnerHooks = []interfaces.RunnerHook{
|
||||
newAllocDirHook(hookLogger, ar.allocDir),
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestHandleResult_Wait_Result(t *testing.T) {
|
|||
outCh1 := make(chan *structs.WaitResult)
|
||||
outCh2 := make(chan *structs.WaitResult)
|
||||
|
||||
// Create two recievers
|
||||
// Create two receivers
|
||||
go func() {
|
||||
outCh1 <- h.Wait(context.Background())
|
||||
}()
|
||||
|
|
|
@ -69,10 +69,6 @@ type TaskRunner struct {
|
|||
// stateDB is for persisting localState and taskState
|
||||
stateDB cstate.StateDB
|
||||
|
||||
// persistedHash is the hash of the last persisted state for skipping
|
||||
// unnecessary writes
|
||||
persistedHash []byte
|
||||
|
||||
// ctx is the task runner's context representing the tasks's lifecycle.
|
||||
// Canceling the context will cause the task to be destroyed.
|
||||
ctx context.Context
|
||||
|
@ -632,7 +628,7 @@ func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error {
|
|||
// Ensure the event is populated with human readable strings
|
||||
event.PopulateEventDisplayMessage()
|
||||
|
||||
// Propogate failure from event to task state
|
||||
// Propagate failure from event to task state
|
||||
if event.FailsTask {
|
||||
tr.state.Failed = true
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ func (tr *TaskRunner) initHooks() {
|
|||
tr.logmonHookConfig = newLogMonHookConfig(task.Name, tr.taskDir.LogDir)
|
||||
|
||||
// Create the task directory hook. This is run first to ensure the
|
||||
// directoy path exists for other hooks.
|
||||
// directory path exists for other hooks.
|
||||
tr.runnerHooks = []interfaces.TaskHook{
|
||||
newValidateHook(tr.clientConfig, hookLogger),
|
||||
newTaskDirHook(tr, hookLogger),
|
||||
|
|
|
@ -26,11 +26,11 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
// The following are the key paths written to the state database
|
||||
allocRunnerStateAllocKey = []byte("alloc")
|
||||
allocRunnerStateImmutableKey = []byte("immutable")
|
||||
allocRunnerStateMutableKey = []byte("mutable")
|
||||
allocRunnerStateAllocDirKey = []byte("alloc-dir")
|
||||
// The following are the key paths written to the state database
|
||||
//allocRunnerStateAllocKey = []byte("alloc")
|
||||
//allocRunnerStateImmutableKey = []byte("immutable")
|
||||
//allocRunnerStateMutableKey = []byte("mutable")
|
||||
//allocRunnerStateAllocDirKey = []byte("alloc-dir")
|
||||
)
|
||||
|
||||
// AllocStateUpdater is used to update the status of an allocation
|
||||
|
@ -115,23 +115,23 @@ type AllocRunner struct {
|
|||
|
||||
// allocRunnerAllocState is state that only has to be written when the alloc
|
||||
// changes.
|
||||
type allocRunnerAllocState struct {
|
||||
Alloc *structs.Allocation
|
||||
}
|
||||
//type allocRunnerAllocState struct {
|
||||
//Alloc *structs.Allocation
|
||||
//}
|
||||
|
||||
// allocRunnerImmutableState is state that only has to be written once.
|
||||
type allocRunnerImmutableState struct {
|
||||
Version string
|
||||
}
|
||||
//// allocRunnerImmutableState is state that only has to be written once.
|
||||
//type allocRunnerImmutableState struct {
|
||||
//Version string
|
||||
//}
|
||||
|
||||
// allocRunnerMutableState is state that has to be written on each save as it
|
||||
// changes over the life-cycle of the alloc_runner.
|
||||
type allocRunnerMutableState struct {
|
||||
AllocClientStatus string
|
||||
AllocClientDescription string
|
||||
TaskStates map[string]*structs.TaskState
|
||||
DeploymentStatus *structs.AllocDeploymentStatus
|
||||
}
|
||||
//type allocRunnerMutableState struct {
|
||||
//AllocClientStatus string
|
||||
//AllocClientDescription string
|
||||
//TaskStates map[string]*structs.TaskState
|
||||
//DeploymentStatus *structs.AllocDeploymentStatus
|
||||
//}
|
||||
|
||||
// NewAllocRunner is used to create a new allocation context
|
||||
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
|
||||
|
|
|
@ -64,9 +64,9 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
// taskRunnerStateAllKey holds all the task runners state. At the moment
|
||||
// there is no need to split it
|
||||
taskRunnerStateAllKey = []byte("simple-all")
|
||||
// taskRunnerStateAllKey holds all the task runners state. At the moment
|
||||
// there is no need to split it
|
||||
//taskRunnerStateAllKey = []byte("simple-all")
|
||||
)
|
||||
|
||||
// taskRestartEvent wraps a TaskEvent with additional metadata to control
|
||||
|
|
|
@ -39,7 +39,7 @@ func newFakeAllocRunner(t *testing.T, logger hclog.Logger) *fakeAllocRunner {
|
|||
alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true
|
||||
alloc.Job.TaskGroups[0].EphemeralDisk.Migrate = true
|
||||
|
||||
path, err := ioutil.TempDir("", "nomad_test_wathcer")
|
||||
path, err := ioutil.TempDir("", "nomad_test_watcher")
|
||||
require.NoError(t, err)
|
||||
|
||||
return &fakeAllocRunner{
|
||||
|
|
|
@ -43,7 +43,7 @@ func (m *MountPointDetectorEmptyMountPoint) MountPoint() (string, error) {
|
|||
func TestCGroupFingerprint(t *testing.T) {
|
||||
{
|
||||
f := &CGroupFingerprint{
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
lastState: cgroupUnavailable,
|
||||
mountPointDetector: &MountPointDetectorMountPointFail{},
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ func TestCGroupFingerprint(t *testing.T) {
|
|||
|
||||
{
|
||||
f := &CGroupFingerprint{
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
lastState: cgroupUnavailable,
|
||||
mountPointDetector: &MountPointDetectorValidMountPoint{},
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ func TestCGroupFingerprint(t *testing.T) {
|
|||
|
||||
{
|
||||
f := &CGroupFingerprint{
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
lastState: cgroupUnavailable,
|
||||
mountPointDetector: &MountPointDetectorEmptyMountPoint{},
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ func TestCGroupFingerprint(t *testing.T) {
|
|||
}
|
||||
{
|
||||
f := &CGroupFingerprint{
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
lastState: cgroupAvailable,
|
||||
mountPointDetector: &MountPointDetectorValidMountPoint{},
|
||||
}
|
||||
|
|
|
@ -51,11 +51,19 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string,
|
|||
// watching and is normally fulfilled by a TaskRunner.
|
||||
//
|
||||
// Restarts are recorded in the []restarts field and re-Watch the check.
|
||||
func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) {
|
||||
c.restarts = append(c.restarts, checkRestartRecord{time.Now(), source, reason, failure})
|
||||
//func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) {
|
||||
func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
||||
restart := checkRestartRecord{
|
||||
timestamp: time.Now(),
|
||||
source: event.Type,
|
||||
reason: event.DisplayMessage,
|
||||
failure: failure,
|
||||
}
|
||||
c.restarts = append(c.restarts, restart)
|
||||
|
||||
// Re-Watch the check just like TaskRunner
|
||||
c.watcher.Watch(c.allocID, c.taskName, c.checkName, c.check, c)
|
||||
return nil
|
||||
}
|
||||
|
||||
// String for debugging
|
||||
|
|
|
@ -1,33 +1,44 @@
|
|||
package consul_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockUpdater struct {
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func (m *mockUpdater) TaskStateUpdated(task string, state *structs.TaskState) {
|
||||
m.logger.Named("test.updater").Debug("update", "task", task, "state", state)
|
||||
}
|
||||
|
||||
// TODO Fix
|
||||
// TestConsul_Integration asserts TaskRunner properly registers and deregisters
|
||||
// services and checks with Consul using an embedded Consul agent.
|
||||
func TestConsul_Integration(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("-short set; skipping")
|
||||
}
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
// Create an embedded Consul server
|
||||
testconsul, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
|
||||
|
@ -61,15 +72,6 @@ func TestConsul_Integration(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(conf.AllocDir)
|
||||
|
||||
tmp, err := ioutil.TempFile("", "state-db")
|
||||
if err != nil {
|
||||
t.Fatalf("error creating state db file: %v", err)
|
||||
}
|
||||
db, err := bolt.Open(tmp.Name(), 0600, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating state db: %v", err)
|
||||
}
|
||||
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
|
@ -121,10 +123,8 @@ func TestConsul_Integration(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
logger := testlog.Logger(t)
|
||||
logUpdate := func(name, state string, event *structs.TaskEvent, lazySync bool) {
|
||||
logger.Printf("[TEST] test.updater: name=%q state=%q event=%v", name, state, event)
|
||||
}
|
||||
logger := testlog.HCLogger(t)
|
||||
logUpdate := &mockUpdater{logger}
|
||||
allocDir := allocdir.NewAllocDir(logger, filepath.Join(conf.AllocDir, alloc.ID))
|
||||
if err := allocDir.Build(); err != nil {
|
||||
t.Fatalf("error building alloc dir: %v", err)
|
||||
|
@ -132,7 +132,7 @@ func TestConsul_Integration(t *testing.T) {
|
|||
taskDir := allocDir.NewTaskDir(task.Name)
|
||||
vclient := vaultclient.NewMockVaultClient()
|
||||
consulClient, err := consulapi.NewClient(consulConfig)
|
||||
assert.Nil(err)
|
||||
require.Nil(err)
|
||||
|
||||
serviceClient := consul.NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
|
||||
defer serviceClient.Shutdown() // just-in-case cleanup
|
||||
|
@ -141,8 +141,22 @@ func TestConsul_Integration(t *testing.T) {
|
|||
serviceClient.Run()
|
||||
close(consulRan)
|
||||
}()
|
||||
tr := taskrunner.NewTaskRunner(logger, conf, db, logUpdate, taskDir, alloc, task, vclient, serviceClient)
|
||||
tr.MarkReceived()
|
||||
|
||||
// Build the config
|
||||
config := &taskrunner.Config{
|
||||
Alloc: alloc,
|
||||
ClientConfig: conf,
|
||||
Consul: serviceClient,
|
||||
Task: task,
|
||||
TaskDir: taskDir,
|
||||
Logger: logger,
|
||||
VaultClient: vclient,
|
||||
StateDB: state.NoopDB{},
|
||||
StateUpdater: logUpdate,
|
||||
}
|
||||
|
||||
tr, err := taskrunner.NewTaskRunner(config)
|
||||
require.NoError(err)
|
||||
go tr.Run()
|
||||
defer func() {
|
||||
// Make sure we always shutdown task runner when the test exits
|
||||
|
@ -150,14 +164,14 @@ func TestConsul_Integration(t *testing.T) {
|
|||
case <-tr.WaitCh():
|
||||
// Exited cleanly, no need to kill
|
||||
default:
|
||||
tr.Kill("", "", true) // just in case
|
||||
tr.Kill(context.Background(), &structs.TaskEvent{}) // just in case
|
||||
}
|
||||
}()
|
||||
|
||||
// Block waiting for the service to appear
|
||||
catalog := consulClient.Catalog()
|
||||
res, meta, err := catalog.Service("httpd2", "test", nil)
|
||||
assert.Nil(err)
|
||||
require.Nil(err)
|
||||
|
||||
for i := 0; len(res) == 0 && i < 10; i++ {
|
||||
//Expected initial request to fail, do a blocking query
|
||||
|
@ -166,7 +180,7 @@ func TestConsul_Integration(t *testing.T) {
|
|||
t.Fatalf("error querying for service: %v", err)
|
||||
}
|
||||
}
|
||||
assert.Len(res, 1)
|
||||
require.Len(res, 1)
|
||||
|
||||
// Truncate results
|
||||
res = res[:]
|
||||
|
@ -174,16 +188,16 @@ func TestConsul_Integration(t *testing.T) {
|
|||
// Assert the service with the checks exists
|
||||
for i := 0; len(res) == 0 && i < 10; i++ {
|
||||
res, meta, err = catalog.Service("httpd", "http", &consulapi.QueryOptions{WaitIndex: meta.LastIndex + 1, WaitTime: 3 * time.Second})
|
||||
assert.Nil(err)
|
||||
require.Nil(err)
|
||||
}
|
||||
assert.Len(res, 1)
|
||||
require.Len(res, 1)
|
||||
|
||||
// Assert the script check passes (mock_driver script checks always
|
||||
// pass) after having time to run once
|
||||
time.Sleep(2 * time.Second)
|
||||
checks, _, err := consulClient.Health().Checks("httpd", nil)
|
||||
assert.Nil(err)
|
||||
assert.Len(checks, 2)
|
||||
require.Nil(err)
|
||||
require.Len(checks, 2)
|
||||
|
||||
for _, check := range checks {
|
||||
if expected := "httpd"; check.ServiceName != expected {
|
||||
|
@ -220,10 +234,10 @@ func TestConsul_Integration(t *testing.T) {
|
|||
t.Fatalf("Unexpected number of checks registered. Got %d; want 2", cnum)
|
||||
}
|
||||
|
||||
logger.Printf("[TEST] consul.test: killing task")
|
||||
logger.Debug("killing task")
|
||||
|
||||
// Kill the task
|
||||
tr.Kill("", "", false)
|
||||
tr.Kill(context.Background(), &structs.TaskEvent{})
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
|
@ -238,7 +252,7 @@ func TestConsul_Integration(t *testing.T) {
|
|||
|
||||
// Ensure Consul is clean
|
||||
services, _, err := catalog.Services(nil)
|
||||
assert.Nil(err)
|
||||
assert.Len(services, 1)
|
||||
assert.Contains(services, "consul")
|
||||
require.Nil(err)
|
||||
require.Len(services, 1)
|
||||
require.Contains(services, "consul")
|
||||
}
|
||||
|
|
|
@ -85,8 +85,9 @@ type restartRecorder struct {
|
|||
restarts int64
|
||||
}
|
||||
|
||||
func (r *restartRecorder) Restart(source, reason string, failure bool) {
|
||||
func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
||||
atomic.AddInt64(&r.restarts, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// testFakeCtx contains a fake Consul AgentAPI
|
||||
|
|
|
@ -292,6 +292,8 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
|
|||
|
||||
// Task should terminate quickly
|
||||
waitCh, err := harness.WaitTask(context.Background(), task.ID)
|
||||
require.NoError(err)
|
||||
|
||||
select {
|
||||
case res := <-waitCh:
|
||||
require.NoError(res.Err)
|
||||
|
|
|
@ -32,7 +32,7 @@ func (h *rawExecTaskHandle) IsRunning() bool {
|
|||
|
||||
func (h *rawExecTaskHandle) run() {
|
||||
|
||||
// since run is called immediatly after the handle is created this
|
||||
// since run is called immediately after the handle is created this
|
||||
// ensures the exitResult is initialized so we avoid a nil pointer
|
||||
// thus it does not need to be included in the lock
|
||||
if h.exitResult == nil {
|
||||
|
|
|
@ -68,7 +68,7 @@ type Fingerprint struct {
|
|||
Health HealthState
|
||||
HealthDescription string
|
||||
|
||||
// Err is set by the plugin if an error occured during fingerprinting
|
||||
// Err is set by the plugin if an error occurred during fingerprinting
|
||||
Err error
|
||||
}
|
||||
|
||||
|
@ -207,7 +207,7 @@ type TaskEvent struct {
|
|||
Message string
|
||||
Annotations map[string]string
|
||||
|
||||
// Err is only used if an error occured while consuming the RPC stream
|
||||
// Err is only used if an error occurred while consuming the RPC stream
|
||||
Err error
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue