Changing the api of the stats endpoints

This commit is contained in:
Diptanu Choudhury 2016-05-27 14:15:51 -07:00
parent fa9b0dd7e8
commit c0dc6cfbf2
9 changed files with 97 additions and 23 deletions

View file

@ -2,8 +2,6 @@ package api
import (
"fmt"
"io/ioutil"
"net/http"
"sort"
"time"

View file

@ -91,8 +91,11 @@ type ClientStatsReporter interface {
// collector
AllocStats() map[string]AllocStatsReporter
// HostStats returns a stats collector for the host
HostStats() *stats.HostStats
// HostStats returns resource usage stats for the host
HostStats() []*stats.HostStats
// HostStatsTS returns a time series of host resource usage stats
HostStatsTS(since int64) []*stats.HostStats
}
// Client is used to implement the client interaction with Nomad. Clients
@ -441,10 +444,47 @@ func (c *Client) AllocStats() map[string]AllocStatsReporter {
}
// HostStats returns all the stats related to a Nomad client
func (c *Client) HostStats() *stats.HostStats {
func (c *Client) HostStats() []*stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
val := c.resourceUsage.Peek()
ru, _ := val.(*stats.HostStats)
return ru
return []*stats.HostStats{ru}
}
func (c *Client) HostStatsTS(since int64) []*stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
values := c.resourceUsage.Values()
low := 0
high := len(values) - 1
var idx int
for {
mid := (low + high) >> 1
midVal, _ := values[mid].(*stats.HostStats)
if midVal.Timestamp < since {
low = mid + 1
} else if midVal.Timestamp > since {
high = mid - 1
} else if midVal.Timestamp == since {
idx = mid
break
}
if low > high {
idx = low
break
}
}
values = values[idx:]
ts := make([]*stats.HostStats, len(values))
for index, val := range values {
ru, _ := val.(*stats.HostStats)
ts[index] = ru
}
return ts
}
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation

View file

@ -1006,7 +1006,7 @@ func (h *DockerHandle) collectStats() {
MemoryStats: ms,
CpuStats: cs,
},
Timestamp: s.Read,
Timestamp: s.Read.UTC().UnixNano(),
}
h.resourceUsageLock.Unlock()
}

View file

@ -488,7 +488,6 @@ func (e *UniversalExecutor) DeregisterServices() error {
// pidStats returns the resource usage stats per pid
func (e *UniversalExecutor) pidStats() (map[string]*cstructs.ResourceUsage, error) {
stats := make(map[string]*cstructs.ResourceUsage)
ts := time.Now()
e.pidLock.RLock()
pids := make([]*nomadPid, len(e.pids))
copy(pids, e.pids)
@ -513,7 +512,7 @@ func (e *UniversalExecutor) pidStats() (map[string]*cstructs.ResourceUsage, erro
// calculate cpu usage percent
cs.Percent = pid.cpuStats.Percent(cpuStats.Total())
}
stats[strconv.Itoa(pid.pid)] = &cstructs.ResourceUsage{MemoryStats: ms, CpuStats: cs, Timestamp: ts}
stats[strconv.Itoa(pid.pid)] = &cstructs.ResourceUsage{MemoryStats: ms, CpuStats: cs}
}
return stats, nil
@ -748,7 +747,7 @@ func (e *UniversalExecutor) scanPids(parentPid int, allPids []ps.Process) ([]*no
// aggregatedResourceUsage aggregates the resource usage of all the pids and
// returns a TaskResourceUsage data point
func (e *UniversalExecutor) aggregatedResourceUsage(pidStats map[string]*cstructs.ResourceUsage) *cstructs.TaskResourceUsage {
ts := time.Now()
ts := time.Now().UTC().UnixNano()
var (
systemModeCPU, userModeCPU, percent float64
totalRSS, totalSwap uint64
@ -777,7 +776,6 @@ func (e *UniversalExecutor) aggregatedResourceUsage(pidStats map[string]*cstruct
resourceUsage := cstructs.ResourceUsage{
MemoryStats: totalMemory,
CpuStats: totalCPU,
Timestamp: ts,
}
return &cstructs.TaskResourceUsage{
ResourceUsage: &resourceUsage,

View file

@ -108,13 +108,12 @@ type CpuStats struct {
type ResourceUsage struct {
MemoryStats *MemoryStats
CpuStats *CpuStats
Timestamp time.Time
}
// TaskResourceUsage holds aggregated resource usage of all processes in a Task
// and the resource usage of the individual pids
type TaskResourceUsage struct {
ResourceUsage *ResourceUsage
Timestamp time.Time
Timestamp int64
Pids map[string]*ResourceUsage
}

View file

@ -1,6 +1,8 @@
package stats
import (
"time"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/host"
@ -13,6 +15,7 @@ type HostStats struct {
CPU []*CPUStats
DiskStats []*DiskStats
Uptime uint64
Timestamp int64
}
// MemoryStats represnts stats related to virtual memory usage
@ -56,7 +59,7 @@ func NewHostStatsCollector() *HostStatsCollector {
// Collect collects stats related to resource usage of a host
func (h *HostStatsCollector) Collect() (*HostStats, error) {
hs := &HostStats{}
hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()}
if memStats, err := mem.VirtualMemory(); err == nil {
ms := &MemoryStats{
Total: memStats.Total,

View file

@ -39,11 +39,11 @@ const (
type TaskStatsReporter interface {
// ResourceUsage returns the latest resource usage data point collected for
// the task
ResourceUsage() *cstructs.TaskResourceUsage
ResourceUsage() []*cstructs.TaskResourceUsage
// ResourceUsageTS returns all the resource usage data points since a given
// time
ResourceUsageTS(since time.Time) []*cstructs.TaskResourceUsage
ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage
}
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
@ -488,17 +488,17 @@ func (r *TaskRunner) StatsReporter() TaskStatsReporter {
}
// ResourceUsage returns the last resource utilization datapoint collected
func (r *TaskRunner) ResourceUsage() *cstructs.TaskResourceUsage {
func (r *TaskRunner) ResourceUsage() []*cstructs.TaskResourceUsage {
r.resourceUsageLock.RLock()
defer r.resourceUsageLock.RUnlock()
val := r.resourceUsage.Peek()
ru, _ := val.(*cstructs.TaskResourceUsage)
return ru
return []*cstructs.TaskResourceUsage{ru}
}
// ResourceUsageTS returns the list of all the resource utilization datapoints
// collected
func (r *TaskRunner) ResourceUsageTS(since time.Time) []*cstructs.TaskResourceUsage {
func (r *TaskRunner) ResourceUsageTS(since int64) []*cstructs.TaskResourceUsage {
r.resourceUsageLock.RLock()
defer r.resourceUsageLock.RUnlock()
@ -510,11 +510,11 @@ func (r *TaskRunner) ResourceUsageTS(since time.Time) []*cstructs.TaskResourceUs
for {
mid := (low + high) >> 1
midVal, _ := values[mid].(*cstructs.TaskResourceUsage)
if midVal.Timestamp.UnixNano() < since.UnixNano() {
if midVal.Timestamp < since {
low = mid + 1
} else if midVal.Timestamp.UnixNano() > since.UnixNano() {
} else if midVal.Timestamp > since {
high = mid - 1
} else if midVal.Timestamp.UnixNano() == since.UnixNano() {
} else if midVal.Timestamp == since {
idx = mid
break
}

View file

@ -1,7 +1,9 @@
package agent
import (
"fmt"
"net/http"
"strconv"
"strings"
"github.com/hashicorp/nomad/nomad/structs"
@ -75,11 +77,25 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
return nil, CodedError(404, "alloc not running on node")
}
var since int
var err error
ts := false
if sinceTime := req.URL.Query().Get("since"); sinceTime != "" {
ts = true
since, err = strconv.Atoi(sinceTime)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("can't read the since query parameter: %v", err))
}
}
if task := req.URL.Query().Get("task"); task != "" {
taskStats, ok := allocStats.AllocStats()[task]
if !ok {
return nil, CodedError(404, "task not present in allocation")
}
if ts {
return taskStats.ResourceUsageTS(int64(since)), nil
}
return taskStats.ResourceUsage(), nil
}
@ -87,7 +103,11 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
// is not specified
res := make(map[string]interface{})
for task, taskStats := range allocStats.AllocStats() {
res[task] = taskStats.ResourceUsage()
if ts {
res[task] = taskStats.ResourceUsageTS(int64(since))
} else {
res[task] = taskStats.ResourceUsage()
}
}
return res, nil
}

View file

@ -1,7 +1,9 @@
package agent
import (
"fmt"
"net/http"
"strconv"
)
func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -9,6 +11,20 @@ func (s *HTTPServer) ClientStatsRequest(resp http.ResponseWriter, req *http.Requ
return nil, clientNotRunning
}
var since int
var err error
ts := false
if sinceTime := req.URL.Query().Get("since"); sinceTime != "" {
ts = true
since, err = strconv.Atoi(sinceTime)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("can't read the since query parameter: %v", err))
}
}
clientStats := s.agent.client.StatsReporter()
if ts {
return clientStats.HostStatsTS(int64(since)), nil
}
return clientStats.HostStats(), nil
}