139 lines
3.2 KiB
Go
139 lines
3.2 KiB
Go
package taskrunner
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
hclog "github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
|
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
|
|
)
|
|
|
|
// StatsUpdater is the interface required by the StatsHook to update stats.
|
|
// Satisfied by TaskRunner.
|
|
type StatsUpdater interface {
|
|
UpdateStats(*cstructs.TaskResourceUsage)
|
|
}
|
|
|
|
// statsHook manages the task stats collection goroutine.
|
|
type statsHook struct {
|
|
updater StatsUpdater
|
|
interval time.Duration
|
|
|
|
// stopCh is closed by Exited or Canceled
|
|
stopCh chan struct{}
|
|
|
|
mu sync.Mutex
|
|
|
|
logger hclog.Logger
|
|
}
|
|
|
|
func newStatsHook(su StatsUpdater, interval time.Duration, logger hclog.Logger) *statsHook {
|
|
h := &statsHook{
|
|
updater: su,
|
|
interval: interval,
|
|
}
|
|
h.logger = logger.Named(h.Name())
|
|
return h
|
|
}
|
|
|
|
func (*statsHook) Name() string {
|
|
return "stats_hook"
|
|
}
|
|
|
|
func (h *statsHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
// This shouldn't happen, but better safe than risk leaking a goroutine
|
|
if h.stopCh != nil {
|
|
h.logger.Debug("poststart called twice without exiting between")
|
|
close(h.stopCh)
|
|
}
|
|
|
|
h.stopCh = make(chan struct{})
|
|
go h.collectResourceUsageStats(req.DriverStats, h.stopCh)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if h.stopCh == nil {
|
|
// No stats running
|
|
return nil
|
|
}
|
|
|
|
// Close chan to stop stats collection
|
|
close(h.stopCh)
|
|
|
|
// Clear chan so we don't double close for any reason
|
|
h.stopCh = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
// collectResourceUsageStats starts collecting resource usage stats of a Task.
|
|
// Collection ends when the passed channel is closed
|
|
func (h *statsHook) collectResourceUsageStats(handle interfaces.DriverStats, stopCh <-chan struct{}) {
|
|
// start collecting the stats right away and then start collecting every
|
|
// collection interval
|
|
next := time.NewTimer(0)
|
|
defer next.Stop()
|
|
for {
|
|
select {
|
|
case <-next.C:
|
|
// Reset the timer
|
|
next.Reset(h.interval)
|
|
|
|
// Collect stats from driver
|
|
ru, err := handle.Stats()
|
|
if err != nil {
|
|
// Check if the driver doesn't implement stats
|
|
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
|
|
h.logger.Debug("driver does not support stats")
|
|
return
|
|
}
|
|
|
|
// We do not log when the plugin is shutdown as this is either:
|
|
// - A race between the stopCollection channel being closed and
|
|
// calling Stats on the handle.
|
|
// - The driver plugin has unexpectedly exited
|
|
//
|
|
// In either case sleeping and trying again or returning based
|
|
// on the stop channel is the correct behavior
|
|
if err != bstructs.ErrPluginShutdown {
|
|
h.logger.Debug("error fetching stats of task", "error", err)
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
// Update stats on TaskRunner and emit them
|
|
h.updater.UpdateStats(ru)
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *statsHook) Shutdown() {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if h.stopCh == nil {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-h.stopCh:
|
|
// Already closed
|
|
default:
|
|
close(h.stopCh)
|
|
}
|
|
}
|