tr: implement stats collection hook

Tested except for the net/rpc specific error case which may need
changing in the gRPC world.
This commit is contained in:
Michael Schurter 2018-09-14 17:08:26 -07:00
parent 86bd329539
commit e6e2930a00
8 changed files with 445 additions and 13 deletions

View File

@ -416,6 +416,7 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener {
// exit (thus closing WaitCh).
func (ar *allocRunner) Destroy() {
// Stop tasks
ar.tasksLock.RLock()
for name, tr := range ar.tasks {
err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled))
if err != nil {
@ -426,6 +427,7 @@ func (ar *allocRunner) Destroy() {
}
}
}
ar.tasksLock.RUnlock()
// Wait for tasks to exit and postrun hooks to finish
<-ar.waitCh
@ -474,15 +476,38 @@ func (ar *allocRunner) IsMigrating() bool {
return ar.prevAllocWatcher.IsMigrating()
}
// StatsReporter needs implementing
//XXX
func (ar *allocRunner) StatsReporter() allocrunner.AllocStatsReporter {
return noopStatsReporter{}
return ar
}
//FIXME implement
type noopStatsReporter struct{}
// LatestAllocStats returns the latest stats for an allocation. If taskFilter
// is set, only stats for that task -- if it exists -- are returned.
func (ar *allocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
ar.tasksLock.RLock()
defer ar.tasksLock.RUnlock()
func (noopStatsReporter) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) {
return nil, fmt.Errorf("not implemented")
astat := &cstructs.AllocResourceUsage{
Tasks: make(map[string]*cstructs.TaskResourceUsage, len(ar.tasks)),
ResourceUsage: &cstructs.ResourceUsage{
MemoryStats: &cstructs.MemoryStats{},
CpuStats: &cstructs.CpuStats{},
},
}
for name, tr := range ar.tasks {
if taskFilter != "" && taskFilter != name {
// Getting stats for a particular task and its not this one!
continue
}
if usage := tr.LatestResourceUsage(); usage != nil {
astat.Tasks[name] = usage
astat.ResourceUsage.Add(usage.ResourceUsage)
if usage.Timestamp > astat.Timestamp {
astat.Timestamp = usage.Timestamp
}
}
}
return astat, nil
}

View File

@ -75,6 +75,11 @@ type TaskPrestartHook interface {
Prestart(context.Context, *TaskPrestartRequest, *TaskPrestartResponse) error
}
// DriverStats is the interface implemented by DriverHandles to return task stats.
type DriverStats interface {
Stats() (*cstructs.TaskResourceUsage, error)
}
type TaskPoststartRequest struct {
// Exec hook (may be nil)
DriverExec driver.ScriptExecutor
@ -84,6 +89,9 @@ type TaskPoststartRequest struct {
// TaskEnv is the task's environment
TaskEnv *env.TaskEnv
// Stats collector
DriverStats DriverStats
}
type TaskPoststartResponse struct{}

View File

@ -122,7 +122,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
return h.consul.UpdateTask(oldTaskServices, newTaskServices)
}
func (h *serviceHook) Exited(ctx context.Context, req *interfaces.TaskExitedRequest, _ *interfaces.TaskExitedResponse) error {
func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
h.mu.Lock()
defer h.mu.Unlock()

View File

@ -0,0 +1,117 @@
package taskrunner
import (
"context"
"strings"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
)
type StatsUpdater interface {
UpdateStats(*cstructs.TaskResourceUsage)
}
type statsHook struct {
updater StatsUpdater
interval time.Duration
// stopCh is closed by Exited
stopCh chan struct{}
mu sync.Mutex
logger hclog.Logger
}
func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook {
h := &statsHook{
updater: su,
interval: interval,
}
h.logger = logger.Named(h.Name())
return h
}
func (*statsHook) Name() string {
return "stats_hook"
}
func (h *statsHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
// This shouldn't happen, but better safe than risk leaking a goroutine
if h.stopCh != nil {
h.logger.Debug("poststart called twice without exiting between")
close(h.stopCh)
}
h.stopCh = make(chan struct{})
go h.collectResourceUsageStats(h.logger, req.DriverStats, h.stopCh)
return nil
}
func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
if h.stopCh == nil {
// No stats running
return nil
}
// Close chan to stop stats collection
close(h.stopCh)
// Clear chan so we don't double close for any reason
h.stopCh = nil
return nil
}
// collectResourceUsageStats starts collecting resource usage stats of a Task.
// Collection ends when the passed channel is closed
func (h *statsHook) collectResourceUsageStats(logger hclog.Logger, handle interfaces.DriverStats, stopCh <-chan struct{}) {
// start collecting the stats right away and then start collecting every
// collection interval
next := time.NewTimer(0)
defer next.Stop()
for {
select {
case <-next.C:
// Reset the timer
next.Reset(h.interval)
// Collect stats from driver
ru, err := handle.Stats()
if err != nil {
// Check if the driver doesn't implement stats
if err.Error() == driver.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return
}
//XXX This is a net/rpc specific error
// We do not log when the plugin is shutdown as this is simply a
// race between the stopCollection channel being closed and calling
// Stats on the handle.
if !strings.Contains(err.Error(), "connection is shut down") {
h.logger.Debug("error fetching stats of task", "error", err)
}
continue
}
// Update stats on TaskRunner and emit them
h.updater.UpdateStats(ru)
case <-stopCh:
return
}
}
}

View File

@ -0,0 +1,180 @@
package taskrunner
import (
"context"
"testing"
"time"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/driver"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/stretchr/testify/require"
)
// Statically assert the stats hook implements the expected interfaces
var _ interfaces.TaskPoststartHook = (*statsHook)(nil)
var _ interfaces.TaskExitedHook = (*statsHook)(nil)
type mockStatsUpdater struct {
// Ch is sent task resource usage updates if not nil
Ch chan *cstructs.TaskResourceUsage
}
// newMockStatsUpdater returns a mockStatsUpdater that blocks on Ch for every
// call to UpdateStats
func newMockStatsUpdater() *mockStatsUpdater {
return &mockStatsUpdater{
Ch: make(chan *cstructs.TaskResourceUsage),
}
}
func (m *mockStatsUpdater) UpdateStats(ru *cstructs.TaskResourceUsage) {
if m.Ch != nil {
m.Ch <- ru
}
}
type mockDriverStats struct {
// err is returned by Stats if it is non-nil
err error
}
func (m *mockDriverStats) Stats() (*cstructs.TaskResourceUsage, error) {
if m.err != nil {
return nil, m.err
}
ru := &cstructs.TaskResourceUsage{
ResourceUsage: &cstructs.ResourceUsage{
MemoryStats: &cstructs.MemoryStats{
RSS: 1,
Measured: []string{"RSS"},
},
CpuStats: &cstructs.CpuStats{
SystemMode: 1,
Measured: []string{"System Mode"},
},
},
Timestamp: time.Now().UnixNano(),
Pids: map[string]*cstructs.ResourceUsage{},
}
ru.Pids["task"] = ru.ResourceUsage
return ru, nil
}
// TestTaskRunner_StatsHook_PoststartExited asserts the stats hook starts and
// stops.
func TestTaskRunner_StatsHook_PoststartExited(t *testing.T) {
t.Parallel()
require := require.New(t)
logger := testlog.HCLogger(t)
su := newMockStatsUpdater()
ds := new(mockDriverStats)
poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds}
// Create hook
h := newStatsHook(su, time.Minute, logger)
// Always call Exited to cleanup goroutines
defer h.Exited(context.Background(), nil, nil)
// Run prestart
require.NoError(h.Poststart(context.Background(), poststartReq, nil))
// An initial stats collection should run and call the updater
select {
case ru := <-su.Ch:
require.Equal(uint64(1), ru.ResourceUsage.MemoryStats.RSS)
case <-time.After(10 * time.Second):
t.Fatalf("timeout waiting for initial stats collection")
}
require.NoError(h.Exited(context.Background(), nil, nil))
}
// TestTaskRunner_StatsHook_Periodic asserts the stats hook collects stats on
// an interval.
func TestTaskRunner_StatsHook_Periodic(t *testing.T) {
t.Parallel()
require := require.New(t)
logger := testlog.HCLogger(t)
su := newMockStatsUpdater()
ds := new(mockDriverStats)
poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds}
// interval needs to be high enough that even on a slow/busy VM
// Exited() can complete within the interval.
const interval = 500 * time.Millisecond
h := newStatsHook(su, interval, logger)
defer h.Exited(context.Background(), nil, nil)
// Run prestart
require.NoError(h.Poststart(context.Background(), poststartReq, nil))
// An initial stats collection should run and call the updater
var firstrun int64
select {
case ru := <-su.Ch:
if ru.Timestamp <= 0 {
t.Fatalf("expected nonzero timestamp (%v)", ru.Timestamp)
}
firstrun = ru.Timestamp
case <-time.After(10 * time.Second):
t.Fatalf("timeout waiting for initial stats collection")
}
// Should get another update in ~500ms (see interval above)
select {
case ru := <-su.Ch:
if ru.Timestamp <= firstrun {
t.Fatalf("expected timestamp (%v) after first run (%v)", ru.Timestamp, firstrun)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout waiting for second stats collection")
}
// Exiting should prevent further updates
require.NoError(h.Exited(context.Background(), nil, nil))
// Should *not* get another update in ~500ms (see interval above)
select {
case ru := <-su.Ch:
t.Fatalf("unexpected update after exit (firstrun=%v; update=%v", firstrun, ru.Timestamp)
case <-time.After(2 * interval):
// Ok! No update after exit as expected.
}
}
// TestTaskRunner_StatsHook_NotImplemented asserts the stats hook stops if the
// driver returns NotImplemented.
func TestTaskRunner_StatsHook_NotImplemented(t *testing.T) {
t.Parallel()
require := require.New(t)
logger := testlog.HCLogger(t)
su := newMockStatsUpdater()
ds := &mockDriverStats{
err: driver.DriverStatsNotImplemented,
}
poststartReq := &interfaces.TaskPoststartRequest{DriverStats: ds}
h := newStatsHook(su, 1, logger)
defer h.Exited(context.Background(), nil, nil)
// Run prestart
require.NoError(h.Poststart(context.Background(), poststartReq, nil))
// An initial stats collection should run and *not* call the updater
select {
case ru := <-su.Ch:
t.Fatalf("unexpected resource update (timestamp=%v)", ru.Timestamp)
case <-time.After(500 * time.Millisecond):
// Ok! No update received because error was returned
}
}

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/env"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -129,6 +130,11 @@ type TaskRunner struct {
// baseLabels are used when emitting tagged metrics. All task runner metrics
// will have these tags, and optionally more.
baseLabels []metrics.Label
// resourceUsage is written via UpdateStats and read via
// LatestResourceUsage. May be nil at all times.
resourceUsage *cstructs.TaskResourceUsage
resourceUsageLock sync.Mutex
}
type Config struct {
@ -659,6 +665,98 @@ func (tr *TaskRunner) triggerUpdateHooks() {
}
}
// LatestResourceUsage returns the last resource utilization datapoint
// collected. May return nil if the task is not running or no resource
// utilization has been collected yet.
func (tr *TaskRunner) LatestResourceUsage() *cstructs.TaskResourceUsage {
tr.resourceUsageLock.Lock()
ru := tr.resourceUsage
tr.resourceUsageLock.Unlock()
return ru
}
// UpdateStats updates and emits the latest stats from the driver.
func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) {
tr.resourceUsageLock.Lock()
tr.resourceUsage = ru
tr.resourceUsageLock.Unlock()
if ru != nil {
tr.emitStats(ru)
}
}
//TODO Remove Backwardscompat or use tr.Alloc()?
func (tr *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"},
float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"},
float32(ru.ResourceUsage.MemoryStats.RSS), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "cache"},
float32(ru.ResourceUsage.MemoryStats.Cache), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "swap"},
float32(ru.ResourceUsage.MemoryStats.Swap), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "max_usage"},
float32(ru.ResourceUsage.MemoryStats.MaxUsage), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_usage"},
float32(ru.ResourceUsage.MemoryStats.KernelUsage), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_max_usage"},
float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage), tr.baseLabels)
}
if tr.clientConfig.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "rss"}, float32(ru.ResourceUsage.MemoryStats.RSS))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "cache"}, float32(ru.ResourceUsage.MemoryStats.Cache))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "swap"}, float32(ru.ResourceUsage.MemoryStats.Swap))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "max_usage"}, float32(ru.ResourceUsage.MemoryStats.MaxUsage))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelUsage))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "memory", "kernel_max_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage))
}
}
//TODO Remove Backwardscompat or use tr.Alloc()?
func (tr *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) {
if !tr.clientConfig.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"},
float32(ru.ResourceUsage.CpuStats.Percent), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"},
float32(ru.ResourceUsage.CpuStats.SystemMode), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"},
float32(ru.ResourceUsage.CpuStats.UserMode), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"},
float32(ru.ResourceUsage.CpuStats.ThrottledTime), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"},
float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), tr.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"},
float32(ru.ResourceUsage.CpuStats.TotalTicks), tr.baseLabels)
}
if tr.clientConfig.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_percent"}, float32(ru.ResourceUsage.CpuStats.Percent))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "system"}, float32(ru.ResourceUsage.CpuStats.SystemMode))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "user"}, float32(ru.ResourceUsage.CpuStats.UserMode))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_time"}, float32(ru.ResourceUsage.CpuStats.ThrottledTime))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "throttled_periods"}, float32(ru.ResourceUsage.CpuStats.ThrottledPeriods))
metrics.SetGauge([]string{"client", "allocs", tr.alloc.Job.Name, tr.alloc.TaskGroup, tr.allocID, tr.taskName, "cpu", "total_ticks"}, float32(ru.ResourceUsage.CpuStats.TotalTicks))
}
}
// emitStats emits resource usage stats of tasks to remote metrics collector
// sinks
func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
if !tr.clientConfig.PublishAllocationMetrics {
return
}
if ru.ResourceUsage.MemoryStats != nil {
tr.setGaugeForMemory(ru)
}
if ru.ResourceUsage.CpuStats != nil {
tr.setGaugeForCPU(ru)
}
}
// appendTaskEvent updates the task status by appending the new event.
func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
const capacity = 10

View File

@ -23,6 +23,7 @@ func (tr *TaskRunner) initHooks() {
newTaskDirHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
newShutdownDelayHook(task.ShutdownDelay, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
}
// If Vault is enabled, add the hook
@ -186,6 +187,7 @@ func (tr *TaskRunner) poststart() error {
req := interfaces.TaskPoststartRequest{
DriverExec: handle,
DriverNetwork: net,
DriverStats: handle,
TaskEnv: tr.envBuilder.Build(),
}
var resp interfaces.TaskPoststartResponse

View File

@ -99,19 +99,21 @@ type ClientStatsReporter interface {
LatestHostStats() *stats.HostStats
}
// AllocRunner is the interface implemented by the core alloc runner.
//TODO Create via factory to allow testing Client with mock AllocRunners.
type AllocRunner interface {
StatsReporter() allocrunner.AllocStatsReporter
Alloc() *structs.Allocation
Destroy()
GetAllocDir() *allocdir.AllocDir
IsDestroyed() bool
IsWaiting() bool
IsMigrating() bool
IsWaiting() bool
Listener() *cstructs.AllocListener
WaitCh() <-chan struct{}
Update(*structs.Allocation)
Alloc() *structs.Allocation
Restore() error
Run()
StatsReporter() allocrunner.AllocStatsReporter
Update(*structs.Allocation)
WaitCh() <-chan struct{}
}
// Client is used to implement the client interaction with Nomad. Clients