Add allocated/unallocated metrics to client

This commit is contained in:
Alex Dadgar 2017-02-16 18:28:11 -08:00
parent 92419929a8
commit 7203dee7ab
2 changed files with 121 additions and 7 deletions

View file

@ -135,6 +135,10 @@ type Client struct {
blockedAllocations map[string]*structs.Allocation
blockedAllocsLock sync.RWMutex
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
@ -151,10 +155,6 @@ type Client struct {
// vaultClient is used to interact with Vault for token and secret renewals
vaultClient vaultclient.VaultClient
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.Mutex
// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
@ -162,14 +162,16 @@ type Client struct {
// migrateAllocCtrl indicates whether migration is complete
type migrateAllocCtrl struct {
alloc *structs.Allocation
ch chan struct{}
closed bool
chLock sync.Mutex
}
func newMigrateAllocCtrl() *migrateAllocCtrl {
func newMigrateAllocCtrl(alloc *structs.Allocation) *migrateAllocCtrl {
return &migrateAllocCtrl{
ch: make(chan struct{}),
ch: make(chan struct{}),
alloc: alloc,
}
}
@ -1501,7 +1503,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
// prevents a race between a finishing blockForRemoteAlloc and
// another invocation of runAllocs
if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok {
c.migratingAllocs[add.ID] = newMigrateAllocCtrl()
c.migratingAllocs[add.ID] = newMigrateAllocCtrl(add)
go c.blockForRemoteAlloc(add)
}
}
@ -2220,6 +2222,111 @@ func (c *Client) emitStats(hStats *stats.HostStats) {
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent))
}
// Get all the resources for the node
c.configLock.RLock()
node := c.configCopy.Node
c.configLock.RUnlock()
total := node.Resources
res := node.Reserved
allocated := c.getAllocatedResources(node)
// Emit allocated
metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB))
metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB))
metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU))
metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS))
for _, n := range allocated.Networks {
metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits))
}
// Emit unallocated
unallocatedMem := total.MemoryMB - res.MemoryMB - allocated.MemoryMB
unallocatedDisk := total.DiskMB - res.DiskMB - allocated.DiskMB
unallocatedCpu := total.CPU - res.CPU - allocated.CPU
unallocatedIops := total.IOPS - res.IOPS - allocated.IOPS
metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem))
metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk))
metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu))
metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops))
for _, n := range allocated.Networks {
totalMbits := 0
totalIdx := total.NetIndex(n)
if totalIdx != -1 {
totalMbits = total.Networks[totalIdx].MBits
continue
}
unallocatedMbits := totalMbits - n.MBits
metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits))
}
}
func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources {
// Unfortunately the allocs only have IP so we need to match them to the
// device
cidrToDevice := make(map[*net.IPNet]string, len(selfNode.Resources.Networks))
for _, n := range selfNode.Resources.Networks {
_, ipnet, err := net.ParseCIDR(n.CIDR)
if err != nil {
continue
}
cidrToDevice[ipnet] = n.Device
}
// Sum the allocated resources
allocs := c.allAllocs()
var allocated structs.Resources
allocatedDeviceMbits := make(map[string]int)
for _, alloc := range allocs {
if !alloc.TerminalStatus() {
allocated.Add(alloc.Resources)
for _, allocatedNetwork := range alloc.Resources.Networks {
for cidr, dev := range cidrToDevice {
ip := net.ParseIP(allocatedNetwork.IP)
if cidr.Contains(ip) {
allocatedDeviceMbits[dev] += allocatedNetwork.MBits
break
}
}
}
}
}
// Clear the networks
allocated.Networks = nil
for dev, speed := range allocatedDeviceMbits {
net := &structs.NetworkResource{
Device: dev,
MBits: speed,
}
allocated.Networks = append(allocated.Networks, net)
}
return &allocated
}
// allAllocs returns all the allocations managed by the client
func (c *Client) allAllocs() []*structs.Allocation {
var allocs []*structs.Allocation
for _, ar := range c.getAllocRunners() {
allocs = append(allocs, ar.Alloc())
}
c.blockedAllocsLock.Lock()
for _, alloc := range c.blockedAllocations {
allocs = append(allocs, alloc)
}
c.blockedAllocsLock.Unlock()
c.migratingAllocsLock.Lock()
for _, ctrl := range c.migratingAllocs {
allocs = append(allocs, ctrl.alloc)
}
c.migratingAllocsLock.Unlock()
return allocs
}
// resolveServer given a sever's address as a string, return it's resolved

View file

@ -22,9 +22,16 @@ client {
}
reserved {
cpu = 500
memory = 512
disk = 1024
}
}
telemetry {
publish_allocation_metrics = true
publish_node_metrics = true
}
# Modify our port to avoid a collision with server1
ports {
http = 5656