Collecting host stats
This commit is contained in:
parent
15d867d5f1
commit
7569b1af2e
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/client/fingerprint"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
|
@ -81,6 +82,11 @@ func DefaultConfig() *config.Config {
|
|||
}
|
||||
}
|
||||
|
||||
type ClientStatsReporter interface {
|
||||
AllocStats() map[string]AllocStatsReporter
|
||||
HostStats() *stats.HostStats
|
||||
}
|
||||
|
||||
// Client is used to implement the client interaction with Nomad. Clients
|
||||
// are expected to register as a schedulable node to the servers, and to
|
||||
// run allocations as determined by the servers.
|
||||
|
@ -116,6 +122,9 @@ type Client struct {
|
|||
|
||||
consulService *consul.ConsulService
|
||||
|
||||
resourceUsage *stats.RingBuff
|
||||
resourceUsageLock sync.RWMutex
|
||||
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
|
@ -126,15 +135,21 @@ func NewClient(cfg *config.Config) (*Client, error) {
|
|||
// Create a logger
|
||||
logger := log.New(cfg.LogOutput, "", log.LstdFlags)
|
||||
|
||||
resourceUsage, err := stats.NewRingBuff(60)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create the client
|
||||
c := &Client{
|
||||
config: cfg,
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
logger: logger,
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
config: cfg,
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
logger: logger,
|
||||
resourceUsage: resourceUsage,
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the client
|
||||
|
@ -189,6 +204,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
|
|||
// Start the client!
|
||||
go c.run()
|
||||
|
||||
// Start collecting stats
|
||||
go c.monitorUsage()
|
||||
|
||||
// Start the consul sync
|
||||
go c.syncConsul()
|
||||
|
||||
|
@ -394,12 +412,24 @@ func (c *Client) Node() *structs.Node {
|
|||
return c.config.Node
|
||||
}
|
||||
|
||||
func (c *Client) AllocStats(alloc string) (AllocStatsReporter, error) {
|
||||
ar, ok := c.allocs[alloc]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("allocation: %q not running on this client", alloc)
|
||||
func (c *Client) StatsReporter() ClientStatsReporter {
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Client) AllocStats() map[string]AllocStatsReporter {
|
||||
res := make(map[string]AllocStatsReporter)
|
||||
for alloc, ar := range c.allocs {
|
||||
res[alloc] = ar
|
||||
}
|
||||
return ar.StatsReporter(), nil
|
||||
return res
|
||||
}
|
||||
|
||||
func (c *Client) HostStats() *stats.HostStats {
|
||||
val := c.resourceUsage.Peek()
|
||||
if val != nil {
|
||||
return val.(*stats.HostStats)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
|
||||
|
@ -1235,3 +1265,23 @@ func (c *Client) syncConsul() {
|
|||
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) monitorUsage() {
|
||||
for {
|
||||
next := time.NewTimer(1 * time.Second)
|
||||
select {
|
||||
case <-next.C:
|
||||
ru, err := stats.CollectHostStats()
|
||||
if err != nil {
|
||||
c.logger.Printf("[DEBUG] client: error fetching stats of host: %v", err)
|
||||
}
|
||||
c.resourceUsageLock.RLock()
|
||||
c.resourceUsage.Enqueue(ru)
|
||||
c.resourceUsageLock.RUnlock()
|
||||
next.Reset(1 * time.Second)
|
||||
case <-c.shutdownCh:
|
||||
next.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
package stats
|
||||
|
||||
import (
|
||||
"github.com/shirou/gopsutil/cpu"
|
||||
"github.com/shirou/gopsutil/mem"
|
||||
)
|
||||
|
||||
type HostStats struct {
|
||||
Memory *MemoryStats
|
||||
CPU []*CPUStats
|
||||
}
|
||||
|
||||
type MemoryStats struct {
|
||||
Total uint64
|
||||
Available uint64
|
||||
Used uint64
|
||||
Free uint64
|
||||
}
|
||||
|
||||
type CPUStats struct {
|
||||
CPU string
|
||||
User float64
|
||||
System float64
|
||||
Idle float64
|
||||
}
|
||||
|
||||
func CollectHostStats() (*HostStats, error) {
|
||||
memStats, err := mem.VirtualMemory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ms := &MemoryStats{
|
||||
Total: memStats.Total,
|
||||
Available: memStats.Available,
|
||||
Used: memStats.Used,
|
||||
Free: memStats.Free,
|
||||
}
|
||||
|
||||
cpuStats, err := cpu.Times(true)
|
||||
cs := make([]*CPUStats, len(cpuStats))
|
||||
for idx, cpuStat := range cpuStats {
|
||||
cs[idx] = &CPUStats{
|
||||
CPU: cpuStat.CPU,
|
||||
User: cpuStat.User,
|
||||
System: cpuStat.System,
|
||||
Idle: cpuStat.Idle,
|
||||
}
|
||||
}
|
||||
|
||||
hs := &HostStats{
|
||||
Memory: ms,
|
||||
CPU: cs,
|
||||
}
|
||||
return hs, nil
|
||||
}
|
|
@ -9,23 +9,25 @@ func (s *HTTPServer) StatsRequest(resp http.ResponseWriter, req *http.Request) (
|
|||
if s.agent.client == nil {
|
||||
return nil, clientNotRunning
|
||||
}
|
||||
cStatsReporter := s.agent.client.StatsReporter()
|
||||
var allocID, task string
|
||||
if allocID = req.URL.Query().Get("allocation"); allocID == "" {
|
||||
return nil, fmt.Errorf("provide a valid alloc id")
|
||||
return cStatsReporter.HostStats(), nil
|
||||
}
|
||||
statsReporter, err := s.agent.client.AllocStats(allocID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
allocStats := cStatsReporter.AllocStats()
|
||||
arStatsReporter, ok := allocStats[allocID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("alloc %q is not running on this client", allocID)
|
||||
}
|
||||
if task = req.URL.Query().Get("task"); task != "" {
|
||||
taskStatsReporter, err := statsReporter.TaskStats(task)
|
||||
taskStatsReporter, err := arStatsReporter.TaskStats(task)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return taskStatsReporter.ResourceUsage(), nil
|
||||
}
|
||||
res := make(map[string]interface{})
|
||||
for task, sr := range statsReporter.AllocStats() {
|
||||
for task, sr := range arStatsReporter.AllocStats() {
|
||||
res[task] = sr.ResourceUsage()
|
||||
}
|
||||
return res, nil
|
||||
|
|
Loading…
Reference in New Issue