diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index db0d9a506..02372f9a8 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -76,7 +76,7 @@ func (ar *allocRunner) initRunnerHooks() { hs := &allocHealthSetter{ar} // Create the alloc directory hook. This is run first to ensure the - // directoy path exists for other hooks. + // directory path exists for other hooks. ar.runnerHooks = []interfaces.RunnerHook{ newAllocDirHook(hookLogger, ar.allocDir), newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir), diff --git a/client/allocrunner/taskrunner/handleproxy_test.go b/client/allocrunner/taskrunner/handleproxy_test.go index 5231d5297..b43029407 100644 --- a/client/allocrunner/taskrunner/handleproxy_test.go +++ b/client/allocrunner/taskrunner/handleproxy_test.go @@ -20,7 +20,7 @@ func TestHandleResult_Wait_Result(t *testing.T) { outCh1 := make(chan *structs.WaitResult) outCh2 := make(chan *structs.WaitResult) - // Create two recievers + // Create two receivers go func() { outCh1 <- h.Wait(context.Background()) }() diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index e93456873..3f93313ca 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -69,10 +69,6 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB - // persistedHash is the hash of the last persisted state for skipping - // unnecessary writes - persistedHash []byte - // ctx is the task runner's context representing the tasks's lifecycle. // Canceling the context will cause the task to be destroyed. ctx context.Context @@ -632,7 +628,7 @@ func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error { // Ensure the event is populated with human readable strings event.PopulateEventDisplayMessage() - // Propogate failure from event to task state + // Propagate failure from event to task state if event.FailsTask { tr.state.Failed = true } diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 7d9b4196f..7ad89f64b 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -19,7 +19,7 @@ func (tr *TaskRunner) initHooks() { tr.logmonHookConfig = newLogMonHookConfig(task.Name, tr.taskDir.LogDir) // Create the task directory hook. This is run first to ensure the - // directoy path exists for other hooks. + // directory path exists for other hooks. tr.runnerHooks = []interfaces.TaskHook{ newValidateHook(tr.clientConfig, hookLogger), newTaskDirHook(tr, hookLogger), diff --git a/client/allocrunnerdeprecated/alloc_runner.go b/client/allocrunnerdeprecated/alloc_runner.go index f5636802b..69912380d 100644 --- a/client/allocrunnerdeprecated/alloc_runner.go +++ b/client/allocrunnerdeprecated/alloc_runner.go @@ -26,11 +26,11 @@ import ( ) var ( - // The following are the key paths written to the state database - allocRunnerStateAllocKey = []byte("alloc") - allocRunnerStateImmutableKey = []byte("immutable") - allocRunnerStateMutableKey = []byte("mutable") - allocRunnerStateAllocDirKey = []byte("alloc-dir") +// The following are the key paths written to the state database +//allocRunnerStateAllocKey = []byte("alloc") +//allocRunnerStateImmutableKey = []byte("immutable") +//allocRunnerStateMutableKey = []byte("mutable") +//allocRunnerStateAllocDirKey = []byte("alloc-dir") ) // AllocStateUpdater is used to update the status of an allocation @@ -115,23 +115,23 @@ type AllocRunner struct { // allocRunnerAllocState is state that only has to be written when the alloc // changes. -type allocRunnerAllocState struct { - Alloc *structs.Allocation -} +//type allocRunnerAllocState struct { +//Alloc *structs.Allocation +//} -// allocRunnerImmutableState is state that only has to be written once. -type allocRunnerImmutableState struct { - Version string -} +//// allocRunnerImmutableState is state that only has to be written once. +//type allocRunnerImmutableState struct { +//Version string +//} // allocRunnerMutableState is state that has to be written on each save as it // changes over the life-cycle of the alloc_runner. -type allocRunnerMutableState struct { - AllocClientStatus string - AllocClientDescription string - TaskStates map[string]*structs.TaskState - DeploymentStatus *structs.AllocDeploymentStatus -} +//type allocRunnerMutableState struct { +//AllocClientStatus string +//AllocClientDescription string +//TaskStates map[string]*structs.TaskState +//DeploymentStatus *structs.AllocDeploymentStatus +//} // NewAllocRunner is used to create a new allocation context func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater, diff --git a/client/allocrunnerdeprecated/taskrunner/task_runner.go b/client/allocrunnerdeprecated/taskrunner/task_runner.go index e34903532..9642591b0 100644 --- a/client/allocrunnerdeprecated/taskrunner/task_runner.go +++ b/client/allocrunnerdeprecated/taskrunner/task_runner.go @@ -64,9 +64,9 @@ const ( ) var ( - // taskRunnerStateAllKey holds all the task runners state. At the moment - // there is no need to split it - taskRunnerStateAllKey = []byte("simple-all") +// taskRunnerStateAllKey holds all the task runners state. At the moment +// there is no need to split it +//taskRunnerStateAllKey = []byte("simple-all") ) // taskRestartEvent wraps a TaskEvent with additional metadata to control diff --git a/client/allocwatcher/alloc_watcher_test.go b/client/allocwatcher/alloc_watcher_test.go index b8a3c8581..6eb7bcc3d 100644 --- a/client/allocwatcher/alloc_watcher_test.go +++ b/client/allocwatcher/alloc_watcher_test.go @@ -39,7 +39,7 @@ func newFakeAllocRunner(t *testing.T, logger hclog.Logger) *fakeAllocRunner { alloc.Job.TaskGroups[0].EphemeralDisk.Sticky = true alloc.Job.TaskGroups[0].EphemeralDisk.Migrate = true - path, err := ioutil.TempDir("", "nomad_test_wathcer") + path, err := ioutil.TempDir("", "nomad_test_watcher") require.NoError(t, err) return &fakeAllocRunner{ diff --git a/client/fingerprint/cgroup_test.go b/client/fingerprint/cgroup_test.go index b3894d2d2..5016cd855 100644 --- a/client/fingerprint/cgroup_test.go +++ b/client/fingerprint/cgroup_test.go @@ -43,7 +43,7 @@ func (m *MountPointDetectorEmptyMountPoint) MountPoint() (string, error) { func TestCGroupFingerprint(t *testing.T) { { f := &CGroupFingerprint{ - logger: testlog.Logger(t), + logger: testlog.HCLogger(t), lastState: cgroupUnavailable, mountPointDetector: &MountPointDetectorMountPointFail{}, } @@ -66,7 +66,7 @@ func TestCGroupFingerprint(t *testing.T) { { f := &CGroupFingerprint{ - logger: testlog.Logger(t), + logger: testlog.HCLogger(t), lastState: cgroupUnavailable, mountPointDetector: &MountPointDetectorValidMountPoint{}, } @@ -88,7 +88,7 @@ func TestCGroupFingerprint(t *testing.T) { { f := &CGroupFingerprint{ - logger: testlog.Logger(t), + logger: testlog.HCLogger(t), lastState: cgroupUnavailable, mountPointDetector: &MountPointDetectorEmptyMountPoint{}, } @@ -109,7 +109,7 @@ func TestCGroupFingerprint(t *testing.T) { } { f := &CGroupFingerprint{ - logger: testlog.Logger(t), + logger: testlog.HCLogger(t), lastState: cgroupAvailable, mountPointDetector: &MountPointDetectorValidMountPoint{}, } diff --git a/command/agent/consul/check_watcher_test.go b/command/agent/consul/check_watcher_test.go index cb5eb5cba..fd8d578ed 100644 --- a/command/agent/consul/check_watcher_test.go +++ b/command/agent/consul/check_watcher_test.go @@ -51,11 +51,19 @@ func newFakeCheckRestarter(w *checkWatcher, allocID, taskName, checkName string, // watching and is normally fulfilled by a TaskRunner. // // Restarts are recorded in the []restarts field and re-Watch the check. -func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) { - c.restarts = append(c.restarts, checkRestartRecord{time.Now(), source, reason, failure}) +//func (c *fakeCheckRestarter) Restart(source, reason string, failure bool) { +func (c *fakeCheckRestarter) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + restart := checkRestartRecord{ + timestamp: time.Now(), + source: event.Type, + reason: event.DisplayMessage, + failure: failure, + } + c.restarts = append(c.restarts, restart) // Re-Watch the check just like TaskRunner c.watcher.Watch(c.allocID, c.taskName, c.checkName, c.check, c) + return nil } // String for debugging diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 7239d1904..c279131b9 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -1,33 +1,44 @@ package consul_test import ( + "context" "io/ioutil" "os" "path/filepath" "testing" "time" - "github.com/boltdb/bolt" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil" + log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +type mockUpdater struct { + logger log.Logger +} + +func (m *mockUpdater) TaskStateUpdated(task string, state *structs.TaskState) { + m.logger.Named("test.updater").Debug("update", "task", task, "state", state) +} + +// TODO Fix // TestConsul_Integration asserts TaskRunner properly registers and deregisters // services and checks with Consul using an embedded Consul agent. func TestConsul_Integration(t *testing.T) { if testing.Short() { t.Skip("-short set; skipping") } - assert := assert.New(t) + require := require.New(t) // Create an embedded Consul server testconsul, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) { @@ -61,15 +72,6 @@ func TestConsul_Integration(t *testing.T) { } defer os.RemoveAll(conf.AllocDir) - tmp, err := ioutil.TempFile("", "state-db") - if err != nil { - t.Fatalf("error creating state db file: %v", err) - } - db, err := bolt.Open(tmp.Name(), 0600, nil) - if err != nil { - t.Fatalf("error creating state db: %v", err) - } - alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] task.Driver = "mock_driver" @@ -121,10 +123,8 @@ func TestConsul_Integration(t *testing.T) { }, } - logger := testlog.Logger(t) - logUpdate := func(name, state string, event *structs.TaskEvent, lazySync bool) { - logger.Printf("[TEST] test.updater: name=%q state=%q event=%v", name, state, event) - } + logger := testlog.HCLogger(t) + logUpdate := &mockUpdater{logger} allocDir := allocdir.NewAllocDir(logger, filepath.Join(conf.AllocDir, alloc.ID)) if err := allocDir.Build(); err != nil { t.Fatalf("error building alloc dir: %v", err) @@ -132,7 +132,7 @@ func TestConsul_Integration(t *testing.T) { taskDir := allocDir.NewTaskDir(task.Name) vclient := vaultclient.NewMockVaultClient() consulClient, err := consulapi.NewClient(consulConfig) - assert.Nil(err) + require.Nil(err) serviceClient := consul.NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true) defer serviceClient.Shutdown() // just-in-case cleanup @@ -141,8 +141,22 @@ func TestConsul_Integration(t *testing.T) { serviceClient.Run() close(consulRan) }() - tr := taskrunner.NewTaskRunner(logger, conf, db, logUpdate, taskDir, alloc, task, vclient, serviceClient) - tr.MarkReceived() + + // Build the config + config := &taskrunner.Config{ + Alloc: alloc, + ClientConfig: conf, + Consul: serviceClient, + Task: task, + TaskDir: taskDir, + Logger: logger, + VaultClient: vclient, + StateDB: state.NoopDB{}, + StateUpdater: logUpdate, + } + + tr, err := taskrunner.NewTaskRunner(config) + require.NoError(err) go tr.Run() defer func() { // Make sure we always shutdown task runner when the test exits @@ -150,14 +164,14 @@ func TestConsul_Integration(t *testing.T) { case <-tr.WaitCh(): // Exited cleanly, no need to kill default: - tr.Kill("", "", true) // just in case + tr.Kill(context.Background(), &structs.TaskEvent{}) // just in case } }() // Block waiting for the service to appear catalog := consulClient.Catalog() res, meta, err := catalog.Service("httpd2", "test", nil) - assert.Nil(err) + require.Nil(err) for i := 0; len(res) == 0 && i < 10; i++ { //Expected initial request to fail, do a blocking query @@ -166,7 +180,7 @@ func TestConsul_Integration(t *testing.T) { t.Fatalf("error querying for service: %v", err) } } - assert.Len(res, 1) + require.Len(res, 1) // Truncate results res = res[:] @@ -174,16 +188,16 @@ func TestConsul_Integration(t *testing.T) { // Assert the service with the checks exists for i := 0; len(res) == 0 && i < 10; i++ { res, meta, err = catalog.Service("httpd", "http", &consulapi.QueryOptions{WaitIndex: meta.LastIndex + 1, WaitTime: 3 * time.Second}) - assert.Nil(err) + require.Nil(err) } - assert.Len(res, 1) + require.Len(res, 1) // Assert the script check passes (mock_driver script checks always // pass) after having time to run once time.Sleep(2 * time.Second) checks, _, err := consulClient.Health().Checks("httpd", nil) - assert.Nil(err) - assert.Len(checks, 2) + require.Nil(err) + require.Len(checks, 2) for _, check := range checks { if expected := "httpd"; check.ServiceName != expected { @@ -220,10 +234,10 @@ func TestConsul_Integration(t *testing.T) { t.Fatalf("Unexpected number of checks registered. Got %d; want 2", cnum) } - logger.Printf("[TEST] consul.test: killing task") + logger.Debug("killing task") // Kill the task - tr.Kill("", "", false) + tr.Kill(context.Background(), &structs.TaskEvent{}) select { case <-tr.WaitCh(): @@ -238,7 +252,7 @@ func TestConsul_Integration(t *testing.T) { // Ensure Consul is clean services, _, err := catalog.Services(nil) - assert.Nil(err) - assert.Len(services, 1) - assert.Contains(services, "consul") + require.Nil(err) + require.Len(services, 1) + require.Contains(services, "consul") } diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 285746213..25dbf2bfb 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -85,8 +85,9 @@ type restartRecorder struct { restarts int64 } -func (r *restartRecorder) Restart(source, reason string, failure bool) { +func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { atomic.AddInt64(&r.restarts, 1) + return nil } // testFakeCtx contains a fake Consul AgentAPI diff --git a/drivers/rawexec/driver_test.go b/drivers/rawexec/driver_test.go index 1c8ed6dbf..dad5810c5 100644 --- a/drivers/rawexec/driver_test.go +++ b/drivers/rawexec/driver_test.go @@ -292,6 +292,8 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { // Task should terminate quickly waitCh, err := harness.WaitTask(context.Background(), task.ID) + require.NoError(err) + select { case res := <-waitCh: require.NoError(res.Err) diff --git a/drivers/rawexec/handle.go b/drivers/rawexec/handle.go index 09fe04e81..d4c8847bb 100644 --- a/drivers/rawexec/handle.go +++ b/drivers/rawexec/handle.go @@ -32,7 +32,7 @@ func (h *rawExecTaskHandle) IsRunning() bool { func (h *rawExecTaskHandle) run() { - // since run is called immediatly after the handle is created this + // since run is called immediately after the handle is created this // ensures the exitResult is initialized so we avoid a nil pointer // thus it does not need to be included in the lock if h.exitResult == nil { diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index 1f47a7165..fec8804d4 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -68,7 +68,7 @@ type Fingerprint struct { Health HealthState HealthDescription string - // Err is set by the plugin if an error occured during fingerprinting + // Err is set by the plugin if an error occurred during fingerprinting Err error } @@ -207,7 +207,7 @@ type TaskEvent struct { Message string Annotations map[string]string - // Err is only used if an error occured while consuming the RPC stream + // Err is only used if an error occurred while consuming the RPC stream Err error }