From e6e2930a00a90d2bdc31a3bd442f01ffa512e709 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 14 Sep 2018 17:08:26 -0700 Subject: [PATCH] tr: implement stats collection hook Tested except for the net/rpc specific error case which may need changing in the gRPC world. --- client/allocrunnerv2/alloc_runner.go | 39 +++- .../interfaces/task_lifecycle.go | 8 + .../allocrunnerv2/taskrunner/service_hook.go | 2 +- client/allocrunnerv2/taskrunner/stats_hook.go | 117 ++++++++++++ .../taskrunner/stats_hook_test.go | 180 ++++++++++++++++++ .../allocrunnerv2/taskrunner/task_runner.go | 98 ++++++++++ .../taskrunner/task_runner_hooks.go | 2 + client/client.go | 12 +- 8 files changed, 445 insertions(+), 13 deletions(-) create mode 100644 client/allocrunnerv2/taskrunner/stats_hook.go create mode 100644 client/allocrunnerv2/taskrunner/stats_hook_test.go diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index dc65e180b..73e5773cd 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -416,6 +416,7 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener { // exit (thus closing WaitCh). func (ar *allocRunner) Destroy() { // Stop tasks + ar.tasksLock.RLock() for name, tr := range ar.tasks { err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) if err != nil { @@ -426,6 +427,7 @@ func (ar *allocRunner) Destroy() { } } } + ar.tasksLock.RUnlock() // Wait for tasks to exit and postrun hooks to finish <-ar.waitCh @@ -474,15 +476,38 @@ func (ar *allocRunner) IsMigrating() bool { return ar.prevAllocWatcher.IsMigrating() } -// StatsReporter needs implementing -//XXX func (ar *allocRunner) StatsReporter() allocrunner.AllocStatsReporter { - return noopStatsReporter{} + return ar } -//FIXME implement -type noopStatsReporter struct{} +// LatestAllocStats returns the latest stats for an allocation. If taskFilter +// is set, only stats for that task -- if it exists -- are returned. +func (ar *allocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { + ar.tasksLock.RLock() + defer ar.tasksLock.RUnlock() -func (noopStatsReporter) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { - return nil, fmt.Errorf("not implemented") + astat := &cstructs.AllocResourceUsage{ + Tasks: make(map[string]*cstructs.TaskResourceUsage, len(ar.tasks)), + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: &cstructs.MemoryStats{}, + CpuStats: &cstructs.CpuStats{}, + }, + } + + for name, tr := range ar.tasks { + if taskFilter != "" && taskFilter != name { + // Getting stats for a particular task and its not this one! + continue + } + + if usage := tr.LatestResourceUsage(); usage != nil { + astat.Tasks[name] = usage + astat.ResourceUsage.Add(usage.ResourceUsage) + if usage.Timestamp > astat.Timestamp { + astat.Timestamp = usage.Timestamp + } + } + } + + return astat, nil } diff --git a/client/allocrunnerv2/interfaces/task_lifecycle.go b/client/allocrunnerv2/interfaces/task_lifecycle.go index 4de78c37a..8be2bfd6e 100644 --- a/client/allocrunnerv2/interfaces/task_lifecycle.go +++ b/client/allocrunnerv2/interfaces/task_lifecycle.go @@ -75,6 +75,11 @@ type TaskPrestartHook interface { Prestart(context.Context, *TaskPrestartRequest, *TaskPrestartResponse) error } +// DriverStats is the interface implemented by DriverHandles to return task stats. +type DriverStats interface { + Stats() (*cstructs.TaskResourceUsage, error) +} + type TaskPoststartRequest struct { // Exec hook (may be nil) DriverExec driver.ScriptExecutor @@ -84,6 +89,9 @@ type TaskPoststartRequest struct { // TaskEnv is the task's environment TaskEnv *env.TaskEnv + + // Stats collector + DriverStats DriverStats } type TaskPoststartResponse struct{} diff --git a/client/allocrunnerv2/taskrunner/service_hook.go b/client/allocrunnerv2/taskrunner/service_hook.go index c1097e36e..28ed0c8aa 100644 --- a/client/allocrunnerv2/taskrunner/service_hook.go +++ b/client/allocrunnerv2/taskrunner/service_hook.go @@ -122,7 +122,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ return h.consul.UpdateTask(oldTaskServices, newTaskServices) } -func (h *serviceHook) Exited(ctx context.Context, req *interfaces.TaskExitedRequest, _ *interfaces.TaskExitedResponse) error { +func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error { h.mu.Lock() defer h.mu.Unlock() diff --git a/client/allocrunnerv2/taskrunner/stats_hook.go b/client/allocrunnerv2/taskrunner/stats_hook.go new file mode 100644 index 000000000..a309e7c34 --- /dev/null +++ b/client/allocrunnerv2/taskrunner/stats_hook.go @@ -0,0 +1,117 @@ +package taskrunner + +import ( + "context" + "strings" + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + "github.com/hashicorp/nomad/client/driver" + cstructs "github.com/hashicorp/nomad/client/structs" +) + +type StatsUpdater interface { + UpdateStats(*cstructs.TaskResourceUsage) +} + +type statsHook struct { + updater StatsUpdater + interval time.Duration + + // stopCh is closed by Exited + stopCh chan struct{} + + mu sync.Mutex + + logger hclog.Logger +} + +func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook { + h := &statsHook{ + updater: su, + interval: interval, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*statsHook) Name() string { + return "stats_hook" +} + +func (h *statsHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + + // This shouldn't happen, but better safe than risk leaking a goroutine + if h.stopCh != nil { + h.logger.Debug("poststart called twice without exiting between") + close(h.stopCh) + } + + h.stopCh = make(chan struct{}) + go h.collectResourceUsageStats(h.logger, req.DriverStats, h.stopCh) + + return nil +} + +func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + + if h.stopCh == nil { + // No stats running + return nil + } + + // Close chan to stop stats collection + close(h.stopCh) + + // Clear chan so we don't double close for any reason + h.stopCh = nil + + return nil +} + +// collectResourceUsageStats starts collecting resource usage stats of a Task. +// Collection ends when the passed channel is closed +func (h *statsHook) collectResourceUsageStats(logger hclog.Logger, handle interfaces.DriverStats, stopCh <-chan struct{}) { + // start collecting the stats right away and then start collecting every + // collection interval + next := time.NewTimer(0) + defer next.Stop() + for { + select { + case <-next.C: + // Reset the timer + next.Reset(h.interval) + + // Collect stats from driver + ru, err := handle.Stats() + if err != nil { + // Check if the driver doesn't implement stats + if err.Error() == driver.DriverStatsNotImplemented.Error() { + h.logger.Debug("driver does not support stats") + return + } + + //XXX This is a net/rpc specific error + // We do not log when the plugin is shutdown as this is simply a + // race between the stopCollection channel being closed and calling + // Stats on the handle. + if !strings.Contains(err.Error(), "connection is shut down") { + h.logger.Debug("error fetching stats of task", "error", err) + } + + continue + } + + // Update stats on TaskRunner and emit them + h.updater.UpdateStats(ru) + case <-stopCh: + return + } + } +} diff --git a/client/allocrunnerv2/taskrunner/stats_hook_test.go b/client/allocrunnerv2/taskrunner/stats_hook_test.go new file mode 100644 index 000000000..98c9854c3 --- /dev/null +++ b/client/allocrunnerv2/taskrunner/stats_hook_test.go @@ -0,0 +1,180 @@ +package taskrunner + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + "github.com/hashicorp/nomad/client/driver" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/stretchr/testify/require" +) + +// Statically assert the stats hook implements the expected interfaces +var _ interfaces.TaskPoststartHook = (*statsHook)(nil) +var _ interfaces.TaskExitedHook = (*statsHook)(nil) + +type mockStatsUpdater struct { + // Ch is sent task resource usage updates if not nil + Ch chan *cstructs.TaskResourceUsage +} + +// newMockStatsUpdater returns a mockStatsUpdater that blocks on Ch for every +// call to UpdateStats +func newMockStatsUpdater() *mockStatsUpdater { + return &mockStatsUpdater{ + Ch: make(chan *cstructs.TaskResourceUsage), + } +} + +func (m *mockStatsUpdater) UpdateStats(ru *cstructs.TaskResourceUsage) { + if m.Ch != nil { + m.Ch <- ru + } +} + +type mockDriverStats struct { + // err is returned by Stats if it is non-nil + err error +} + +func (m *mockDriverStats) Stats() (*cstructs.TaskResourceUsage, error) { + if m.err != nil { + return nil, m.err + } + ru := &cstructs.TaskResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: &cstructs.MemoryStats{ + RSS: 1, + Measured: []string{"RSS"}, + }, + CpuStats: &cstructs.CpuStats{ + SystemMode: 1, + Measured: []string{"System Mode"}, + }, + }, + Timestamp: time.Now().UnixNano(), + Pids: map[string]*cstructs.ResourceUsage{}, + } + ru.Pids["task"] = ru.ResourceUsage + return ru, nil +} + +// TestTaskRunner_StatsHook_PoststartExited asserts the stats hook starts and +// stops. +func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) { + t.Parallel() + + require := require.New(t) + logger := testlog.HCLogger(t) + su := newMockStatsUpdater() + ds := new(mockDriverStats) + + poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds} + + // Create hook + h := newStatsHook(su, time.Minute, logger) + + // Always call Exited to cleanup goroutines + defer h.Exited(context.Background(), nil, nil) + + // Run prestart + require.NoError(h.Poststart(context.Background(), poststartReq, nil)) + + // An initial stats collection should run and call the updater + select { + case ru := <-su.Ch: + require.Equal(uint64(1), ru.ResourceUsage.MemoryStats.RSS) + case <-time.After(10 * time.Second): + t.Fatalf("timeout waiting for initial stats collection") + } + + require.NoError(h.Exited(context.Background(), nil, nil)) +} + +// TestTaskRunner_StatsHook_Periodic asserts the stats hook collects stats on +// an interval. +func TestTaskRunner_StatsHook_Periodic(t *testing.T) { + t.Parallel() + + require := require.New(t) + logger := testlog.HCLogger(t) + su := newMockStatsUpdater() + + ds := new(mockDriverStats) + poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds} + + // interval needs to be high enough that even on a slow/busy VM + // Exited() can complete within the interval. + const interval = 500 * time.Millisecond + + h := newStatsHook(su, interval, logger) + defer h.Exited(context.Background(), nil, nil) + + // Run prestart + require.NoError(h.Poststart(context.Background(), poststartReq, nil)) + + // An initial stats collection should run and call the updater + var firstrun int64 + select { + case ru := <-su.Ch: + if ru.Timestamp <= 0 { + t.Fatalf("expected nonzero timestamp (%v)", ru.Timestamp) + } + firstrun = ru.Timestamp + case <-time.After(10 * time.Second): + t.Fatalf("timeout waiting for initial stats collection") + } + + // Should get another update in ~500ms (see interval above) + select { + case ru := <-su.Ch: + if ru.Timestamp <= firstrun { + t.Fatalf("expected timestamp (%v) after first run (%v)", ru.Timestamp, firstrun) + } + case <-time.After(10 * time.Second): + t.Fatalf("timeout waiting for second stats collection") + } + + // Exiting should prevent further updates + require.NoError(h.Exited(context.Background(), nil, nil)) + + // Should *not* get another update in ~500ms (see interval above) + select { + case ru := <-su.Ch: + t.Fatalf("unexpected update after exit (firstrun=%v; update=%v", firstrun, ru.Timestamp) + case <-time.After(2 * interval): + // Ok! No update after exit as expected. + } +} + +// TestTaskRunner_StatsHook_NotImplemented asserts the stats hook stops if the +// driver returns NotImplemented. +func TestTaskRunner_StatsHook_NotImplemented(t *testing.T) { + t.Parallel() + + require := require.New(t) + logger := testlog.HCLogger(t) + su := newMockStatsUpdater() + ds := &mockDriverStats{ + err: driver.DriverStatsNotImplemented, + } + + poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds} + + h := newStatsHook(su, 1, logger) + defer h.Exited(context.Background(), nil, nil) + + // Run prestart + require.NoError(h.Poststart(context.Background(), poststartReq, nil)) + + // An initial stats collection should run and *not* call the updater + select { + case ru := <-su.Ch: + t.Fatalf("unexpected resource update (timestamp=%v)", ru.Timestamp) + case <-time.After(500 * time.Millisecond): + // Ok! No update received because error was returned + } +} diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index 3118626ef..11da80637 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/client/driver/env" cstate "github.com/hashicorp/nomad/client/state" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" ) @@ -129,6 +130,11 @@ type TaskRunner struct { // baseLabels are used when emitting tagged metrics. All task runner metrics // will have these tags, and optionally more. baseLabels []metrics.Label + + // resourceUsage is written via UpdateStats and read via + // LatestResourceUsage. May be nil at all times. + resourceUsage *cstructs.TaskResourceUsage + resourceUsageLock sync.Mutex } type Config struct { @@ -659,6 +665,98 @@ func (tr *TaskRunner) triggerUpdateHooks() { } } +// LatestResourceUsage returns the last resource utilization datapoint +// collected. May return nil if the task is not running or no resource +// utilization has been collected yet. +func (tr *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage { + tr.resourceUsageLock.Lock() + ru := tr.resourceUsage + tr.resourceUsageLock.Unlock() + return ru +} + +// UpdateStats updates and emits the latest stats from the driver. +func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) { + tr.resourceUsageLock.Lock() + tr.resourceUsage = ru + tr.resourceUsageLock.Unlock() + if ru != nil { + tr.emitStats(ru) + } +} + +//TODO Remove Backwardscompat or use tr.Alloc()? +func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) { + if !tr.clientConfig.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"}, + float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"}, + float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "cache"}, + float32(ru.ResourceUsage.MemoryStats.Cache), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "swap"}, + float32(ru.ResourceUsage.MemoryStats.Swap), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "max_usage"}, + float32(ru.ResourceUsage.MemoryStats.MaxUsage), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_usage"}, + float32(ru.ResourceUsage.MemoryStats.KernelUsage), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_max_usage"}, + float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage), tr.baseLabels) + } + + if tr.clientConfig.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "rss"}, float32(ru.ResourceUsage.MemoryStats.RSS)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "cache"}, float32(ru.ResourceUsage.MemoryStats.Cache)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "swap"}, float32(ru.ResourceUsage.MemoryStats.Swap)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "max_usage"}, float32(ru.ResourceUsage.MemoryStats.MaxUsage)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelUsage)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_max_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage)) + } +} + +//TODO Remove Backwardscompat or use tr.Alloc()? +func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) { + if !tr.clientConfig.DisableTaggedMetrics { + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"}, + float32(ru.ResourceUsage.CpuStats.Percent), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"}, + float32(ru.ResourceUsage.CpuStats.SystemMode), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"}, + float32(ru.ResourceUsage.CpuStats.UserMode), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"}, + float32(ru.ResourceUsage.CpuStats.ThrottledTime), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"}, + float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), tr.baseLabels) + metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"}, + float32(ru.ResourceUsage.CpuStats.TotalTicks), tr.baseLabels) + } + + if tr.clientConfig.BackwardsCompatibleMetrics { + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_percent"}, float32(ru.ResourceUsage.CpuStats.Percent)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "system"}, float32(ru.ResourceUsage.CpuStats.SystemMode)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "user"}, float32(ru.ResourceUsage.CpuStats.UserMode)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_time"}, float32(ru.ResourceUsage.CpuStats.ThrottledTime)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_periods"}, float32(ru.ResourceUsage.CpuStats.ThrottledPeriods)) + metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_ticks"}, float32(ru.ResourceUsage.CpuStats.TotalTicks)) + } +} + +// emitStats emits resource usage stats of tasks to remote metrics collector +// sinks +func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { + if !tr.clientConfig.PublishAllocationMetrics { + return + } + + if ru.ResourceUsage.MemoryStats != nil { + tr.setGaugeForMemory(ru) + } + + if ru.ResourceUsage.CpuStats != nil { + tr.setGaugeForCPU(ru) + } +} + // appendTaskEvent updates the task status by appending the new event. func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) { const capacity = 10 diff --git a/client/allocrunnerv2/taskrunner/task_runner_hooks.go b/client/allocrunnerv2/taskrunner/task_runner_hooks.go index bad2c84a3..b377fd7d3 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_hooks.go +++ b/client/allocrunnerv2/taskrunner/task_runner_hooks.go @@ -23,6 +23,7 @@ func (tr *TaskRunner) initHooks() { newTaskDirHook(tr, hookLogger), newArtifactHook(tr, hookLogger), newShutdownDelayHook(task.ShutdownDelay, hookLogger), + newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), } // If Vault is enabled, add the hook @@ -186,6 +187,7 @@ func (tr *TaskRunner) poststart() error { req := interfaces.TaskPoststartRequest{ DriverExec: handle, DriverNetwork: net, + DriverStats: handle, TaskEnv: tr.envBuilder.Build(), } var resp interfaces.TaskPoststartResponse diff --git a/client/client.go b/client/client.go index 09f6d063e..83508769f 100644 --- a/client/client.go +++ b/client/client.go @@ -99,19 +99,21 @@ type ClientStatsReporter interface { LatestHostStats() *stats.HostStats } +// AllocRunner is the interface implemented by the core alloc runner. +//TODO Create via factory to allow testing Client with mock AllocRunners. type AllocRunner interface { - StatsReporter() allocrunner.AllocStatsReporter + Alloc() *structs.Allocation Destroy() GetAllocDir() *allocdir.AllocDir IsDestroyed() bool - IsWaiting() bool IsMigrating() bool + IsWaiting() bool Listener() *cstructs.AllocListener - WaitCh() <-chan struct{} - Update(*structs.Allocation) - Alloc() *structs.Allocation Restore() error Run() + StatsReporter() allocrunner.AllocStatsReporter + Update(*structs.Allocation) + WaitCh() <-chan struct{} } // Client is used to implement the client interaction with Nomad. Clients