Merge pull request #2081 from hashicorp/f-gc

Garbage collector for allocations
This commit is contained in:
Diptanu Choudhury 2016-12-20 11:19:32 -08:00 committed by GitHub
commit 6c11f38cb0
7 changed files with 820 additions and 36 deletions

View File

@ -6,6 +6,10 @@ BUG FIXES:
## 0.5.2 (Unreleased)
IMPROVEMENTS:
* client: Garbage collect Allocation Runners to free up disk resouces
[GH-2081]
BUG FIXES:
* client: Fixed a race condition and remove panic when handling duplicate
allocations [GH-2096]

View File

@ -141,8 +141,6 @@ type Client struct {
// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
resourceUsage *stats.HostStats
resourceUsageLock sync.RWMutex
shutdown bool
shutdownCh chan struct{}
@ -154,6 +152,10 @@ type Client struct {
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]chan struct{}
migratingAllocsLock sync.Mutex
// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
}
var (
@ -182,7 +184,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(logger),
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
@ -198,6 +199,11 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
return nil, fmt.Errorf("failed to initialize client: %v", err)
}
// Add the stats collector and the garbage collector
statsCollector := stats.NewHostStatsCollector(logger, c.config.AllocDir)
c.hostStatsCollector = statsCollector
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, cfg.Node.Reserved.DiskMB)
// Setup the node
if err := c.setupNode(); err != nil {
return nil, fmt.Errorf("node setup failed: %v", err)
@ -367,6 +373,9 @@ func (c *Client) Shutdown() error {
c.vaultClient.Stop()
}
// Stop Garbage collector
c.garbageCollector.Stop()
// Destroy all the running allocations.
if c.config.DevMode {
c.allocLock.Lock()
@ -434,6 +443,17 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}
// CollectAllocation garbage collects a single allocation
func (c *Client) CollectAllocation(allocID string) error {
return c.garbageCollector.Collect(allocID)
}
// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() error {
return c.garbageCollector.CollectAll()
}
// Node returns the locally registered node
func (c *Client) Node() *structs.Node {
c.configLock.RLock()
@ -459,9 +479,7 @@ func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) {
// HostStats returns all the stats related to a Nomad client
func (c *Client) LatestHostStats() *stats.HostStats {
c.resourceUsageLock.RLock()
defer c.resourceUsageLock.RUnlock()
return c.resourceUsage
return c.hostStatsCollector.Stats()
}
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
@ -1088,6 +1106,15 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
}
c.blockedAllocsLock.Unlock()
// Mark the allocation for GC if it is in terminal state
if alloc.Terminated() {
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
if err := c.garbageCollector.MarkForCollection(ar); err != nil {
c.logger.Printf("[DEBUG] client: couldn't add alloc %v for GC: %v", alloc.ID, err)
}
}
}
// Strip all the information that can be reconstructed at the server. Only
// send the fields that are updatable by the client.
stripped := new(structs.Allocation)
@ -1732,6 +1759,9 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error {
delete(c.allocs, alloc.ID)
c.allocLock.Unlock()
// Remove the allocrunner from garbage collector
c.garbageCollector.Remove(ar)
ar.Destroy()
return nil
}
@ -1761,6 +1791,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
return nil
}
// Make room for the allocation
if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
c.logger.Printf("[ERR] client: error making room for allocation: %v", err)
}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar.SetPreviousAllocDir(prevAllocDir)
@ -2068,20 +2103,16 @@ func (c *Client) collectHostStats() {
for {
select {
case <-next.C:
ru, err := c.hostStatsCollector.Collect()
err := c.hostStatsCollector.Collect()
next.Reset(c.config.StatsCollectionInterval)
if err != nil {
c.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err)
continue
}
c.resourceUsageLock.Lock()
c.resourceUsage = ru
c.resourceUsageLock.Unlock()
// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitStats(ru)
c.emitStats(c.hostStatsCollector.Stats())
}
case <-c.shutdownCh:
return

343
client/gc.go Normal file
View File

@ -0,0 +1,343 @@
package client
import (
"container/heap"
"fmt"
"log"
"sync"
"time"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// diskUsageThreshold is the percent of used disk space beyond which Nomad
// GCs terminated allocations
diskUsageThreshold = 80
// gcInterval is the interval at which Nomad runs the garbage collector
gcInterval = 1 * time.Minute
// inodeUsageThreshold is the percent of inode usage that Nomad tries to
// maintain, whenever we are over it we will attempt to GC terminal
// allocations
inodeUsageThreshold = 70
// MB is a constant which converts values in bytes to MB
MB = 1024 * 1024
)
// GCAlloc wraps an allocation runner and an index enabling it to be used within
// a PQ
type GCAlloc struct {
timeStamp time.Time
allocRunner *AllocRunner
index int
}
type GCAllocPQImpl []*GCAlloc
func (pq GCAllocPQImpl) Len() int {
return len(pq)
}
func (pq GCAllocPQImpl) Less(i, j int) bool {
return pq[i].timeStamp.Before(pq[j].timeStamp)
}
func (pq GCAllocPQImpl) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *GCAllocPQImpl) Push(x interface{}) {
n := len(*pq)
item := x.(*GCAlloc)
item.index = n
*pq = append(*pq, item)
}
func (pq *GCAllocPQImpl) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// IndexedGCAllocPQ is an indexed PQ which maintains a list of allocation runner
// based on their termination time.
type IndexedGCAllocPQ struct {
index map[string]*GCAlloc
heap GCAllocPQImpl
pqLock sync.Mutex
}
func NewIndexedGCAllocPQ() *IndexedGCAllocPQ {
return &IndexedGCAllocPQ{
index: make(map[string]*GCAlloc),
heap: make(GCAllocPQImpl, 0),
}
}
func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error {
i.pqLock.Lock()
defer i.pqLock.Unlock()
alloc := ar.Alloc()
if _, ok := i.index[alloc.ID]; ok {
return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID)
}
gcAlloc := &GCAlloc{
timeStamp: time.Now(),
allocRunner: ar,
}
i.index[alloc.ID] = gcAlloc
heap.Push(&i.heap, gcAlloc)
return nil
}
func (i *IndexedGCAllocPQ) Pop() *GCAlloc {
i.pqLock.Lock()
defer i.pqLock.Unlock()
if len(i.heap) == 0 {
return nil
}
gcAlloc := heap.Pop(&i.heap).(*GCAlloc)
delete(i.index, gcAlloc.allocRunner.Alloc().ID)
return gcAlloc
}
func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) {
i.pqLock.Lock()
defer i.pqLock.Unlock()
if gcAlloc, ok := i.index[allocID]; ok {
heap.Remove(&i.heap, gcAlloc.index)
delete(i.index, allocID)
return gcAlloc, nil
}
return nil, fmt.Errorf("alloc %q not present", allocID)
}
func (i *IndexedGCAllocPQ) Length() int {
i.pqLock.Lock()
defer i.pqLock.Unlock()
return len(i.heap)
}
// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
statsCollector stats.NodeStatsCollector
reservedDiskMB int
logger *log.Logger
shutdownCh chan struct{}
}
// NewAllocGarbageCollector returns a garbage collector for terminated
// allocations on a node.
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, reservedDiskMB int) *AllocGarbageCollector {
gc := &AllocGarbageCollector{
allocRunners: NewIndexedGCAllocPQ(),
statsCollector: statsCollector,
reservedDiskMB: reservedDiskMB,
logger: logger,
shutdownCh: make(chan struct{}),
}
go gc.run()
return gc
}
func (a *AllocGarbageCollector) run() {
ticker := time.NewTicker(gcInterval)
for {
select {
case <-ticker.C:
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err)
}
case <-a.shutdownCh:
ticker.Stop()
return
}
}
}
// keepUsageBelowThreshold collects disk usage information and garbage collects
// allocations to make disk space available.
func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
for {
// Check if we have enough free space
err := a.statsCollector.Collect()
if err != nil {
return err
}
// See if we are below thresholds for used disk space and inode usage
diskStats := a.statsCollector.Stats().AllocDirStats
if diskStats == nil {
break
}
if diskStats.UsedPercent <= diskUsageThreshold &&
diskStats.InodesUsedPercent <= inodeUsageThreshold {
break
}
// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
}
ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)
// Destroy the alloc runner and wait until it exits
ar.Destroy()
select {
case <-ar.WaitCh():
case <-a.shutdownCh:
}
}
return nil
}
func (a *AllocGarbageCollector) Stop() {
close(a.shutdownCh)
}
// Collect garbage collects a single allocation on a node
func (a *AllocGarbageCollector) Collect(allocID string) error {
gcAlloc, err := a.allocRunners.Remove(allocID)
if err != nil {
return fmt.Errorf("unable to collect allocation %q: %v", allocID, err)
}
ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID)
ar.Destroy()
return nil
}
// CollectAll garbage collects all termianated allocations on a node
func (a *AllocGarbageCollector) CollectAll() error {
for {
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
}
ar := gcAlloc.allocRunner
a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID)
ar.Destroy()
}
return nil
}
// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
totalResource := &structs.Resources{}
for _, alloc := range allocations {
if err := totalResource.Add(alloc.Resources); err != nil {
return err
}
}
// If the host has enough free space to accomodate the new allocations then
// we don't need to garbage collect terminated allocations
if hostStats := a.statsCollector.Stats(); hostStats != nil {
var availableForAllocations uint64
if hostStats.AllocDirStats.Available < uint64(a.reservedDiskMB*MB) {
availableForAllocations = 0
} else {
availableForAllocations = hostStats.AllocDirStats.Available - uint64(a.reservedDiskMB*MB)
}
if uint64(totalResource.DiskMB*MB) < availableForAllocations {
return nil
}
}
var diskCleared int
for {
// Collect host stats and see if we still need to remove older
// allocations
var allocDirStats *stats.DiskStats
if err := a.statsCollector.Collect(); err == nil {
if hostStats := a.statsCollector.Stats(); hostStats != nil {
allocDirStats = hostStats.AllocDirStats
}
}
if allocDirStats != nil {
if allocDirStats.Available >= uint64(totalResource.DiskMB*MB) {
break
}
} else {
// Falling back to a simpler model to know if we have enough disk
// space if stats collection fails
if diskCleared >= totalResource.DiskMB {
break
}
}
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
break
}
ar := gcAlloc.allocRunner
alloc := ar.Alloc()
a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID)
// Destroy the alloc runner and wait until it exits
ar.Destroy()
select {
case <-ar.WaitCh():
case <-a.shutdownCh:
}
// Call stats collect again
diskCleared += alloc.Resources.DiskMB
}
return nil
}
// MarkForCollection starts tracking an allocation for Garbage Collection
func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error {
if ar == nil {
return fmt.Errorf("nil allocation runner inserted for garbage collection")
}
if ar.Alloc() == nil {
a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting")
ar.Destroy()
}
a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID)
return a.allocRunners.Push(ar)
}
// Remove removes an alloc runner without garbage collecting it
func (a *AllocGarbageCollector) Remove(ar *AllocRunner) {
if ar == nil || ar.Alloc() == nil {
return
}
alloc := ar.Alloc()
if _, err := a.allocRunners.Remove(alloc.ID); err == nil {
a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID)
}
}

347
client/gc_test.go Normal file
View File

@ -0,0 +1,347 @@
package client
import (
"log"
"os"
"testing"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestIndexedGCAllocPQ(t *testing.T) {
pq := NewIndexedGCAllocPQ()
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
_, ar3 := testAllocRunnerFromAlloc(mock.Alloc(), false)
_, ar4 := testAllocRunnerFromAlloc(mock.Alloc(), false)
pq.Push(ar1)
pq.Push(ar2)
pq.Push(ar3)
pq.Push(ar4)
allocID := pq.Pop().allocRunner.Alloc().ID
if allocID != ar1.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar2.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar3.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}
allocID = pq.Pop().allocRunner.Alloc().ID
if allocID != ar4.Alloc().ID {
t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID)
}
gcAlloc := pq.Pop()
if gcAlloc != nil {
t.Fatalf("expected nil, got %v", gcAlloc)
}
}
type MockStatsCollector struct {
availableValues []uint64
usedPercents []float64
inodePercents []float64
index int
}
func (m *MockStatsCollector) Collect() error {
return nil
}
func (m *MockStatsCollector) Stats() *stats.HostStats {
if len(m.availableValues) == 0 {
return nil
}
available := m.availableValues[m.index]
usedPercent := m.usedPercents[m.index]
inodePercent := m.inodePercents[m.index]
if m.index < len(m.availableValues)-1 {
m.index = m.index + 1
}
return &stats.HostStats{
AllocDirStats: &stats.DiskStats{
Available: available,
UsedPercent: usedPercent,
InodesUsedPercent: inodePercent,
},
}
}
func TestAllocGarbageCollector_MarkForCollection(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
gcAlloc := gc.allocRunners.Pop()
if gcAlloc == nil || gcAlloc.allocRunner != ar1 {
t.Fatalf("bad gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_Collect(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.Collect(ar1.Alloc().ID); err != nil {
t.Fatalf("err: %v", err)
}
gcAlloc := gc.allocRunners.Pop()
if gcAlloc == nil || gcAlloc.allocRunner != ar2 {
t.Fatalf("bad gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_CollectAll(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.CollectAll(); err != nil {
t.Fatalf("err: %v", err)
}
gcAlloc := gc.allocRunners.Pop()
if gcAlloc != nil {
t.Fatalf("bad gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
statsCollector := &MockStatsCollector{}
gc := NewAllocGarbageCollector(logger, statsCollector, 20)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar2.waitCh)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
// Make stats collector report 200MB free out of which 20MB is reserved
statsCollector.availableValues = []uint64{200 * MB}
statsCollector.usedPercents = []float64{0}
statsCollector.inodePercents = []float64{0}
alloc := mock.Alloc()
alloc.Resources.DiskMB = 150
if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
// When we have enough disk available and don't need to do any GC so we
// should have two ARs in the GC queue
for i := 0; i < 2; i++ {
if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil {
t.Fatalf("err: %v", gcAlloc)
}
}
}
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
statsCollector := &MockStatsCollector{}
gc := NewAllocGarbageCollector(logger, statsCollector, 20)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar2.waitCh)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
// Make stats collector report 80MB and 175MB free in subsequent calls
statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 175 * MB}
statsCollector.usedPercents = []float64{0, 0, 0}
statsCollector.inodePercents = []float64{0, 0, 0}
alloc := mock.Alloc()
alloc.Resources.DiskMB = 150
if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
// We should be GC-ing one alloc
if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil {
t.Fatalf("err: %v", gcAlloc)
}
if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil {
t.Fatalf("gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
statsCollector := &MockStatsCollector{}
gc := NewAllocGarbageCollector(logger, statsCollector, 20)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar2.waitCh)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
// Make stats collector report 80MB and 95MB free in subsequent calls
statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 95 * MB}
statsCollector.usedPercents = []float64{0, 0, 0}
statsCollector.inodePercents = []float64{0, 0, 0}
alloc := mock.Alloc()
alloc.Resources.DiskMB = 150
if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
// We should be GC-ing all the alloc runners
if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil {
t.Fatalf("gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
statsCollector := &MockStatsCollector{}
gc := NewAllocGarbageCollector(logger, statsCollector, 20)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar2.waitCh)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
alloc := mock.Alloc()
alloc.Resources.DiskMB = 150
if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
// We should be GC-ing one alloc
if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil {
t.Fatalf("err: %v", gcAlloc)
}
if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil {
t.Fatalf("gcAlloc: %v", gcAlloc)
}
}
func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
statsCollector := &MockStatsCollector{}
gc := NewAllocGarbageCollector(logger, statsCollector, 20)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar2.waitCh)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
statsCollector.availableValues = []uint64{1000}
statsCollector.usedPercents = []float64{20}
statsCollector.inodePercents = []float64{10}
if err := gc.keepUsageBelowThreshold(); err != nil {
t.Fatalf("err: %v", err)
}
// We shouldn't GC any of the allocs since the used percent values are below
// threshold
for i := 0; i < 2; i++ {
if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil {
t.Fatalf("err: %v", gcAlloc)
}
}
}
func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) {
logger := log.New(os.Stdout, "", 0)
statsCollector := &MockStatsCollector{}
gc := NewAllocGarbageCollector(logger, statsCollector, 20)
_, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar1.waitCh)
_, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false)
close(ar2.waitCh)
if err := gc.MarkForCollection(ar1); err != nil {
t.Fatalf("err: %v", err)
}
if err := gc.MarkForCollection(ar2); err != nil {
t.Fatalf("err: %v", err)
}
statsCollector.availableValues = []uint64{1000, 800}
statsCollector.usedPercents = []float64{85, 60}
statsCollector.inodePercents = []float64{50, 30}
if err := gc.keepUsageBelowThreshold(); err != nil {
t.Fatalf("err: %v", err)
}
// We should be GC-ing only one of the alloc runners since the second time
// used percent returns a number below threshold.
if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil {
t.Fatalf("err: %v", gcAlloc)
}
if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil {
t.Fatalf("gcAlloc: %v", gcAlloc)
}
}

View File

@ -4,6 +4,7 @@ import (
"log"
"math"
"runtime"
"sync"
"time"
"github.com/shirou/gopsutil/cpu"
@ -19,6 +20,7 @@ type HostStats struct {
Memory *MemoryStats
CPU []*CPUStats
DiskStats []*DiskStats
AllocDirStats *DiskStats
Uptime uint64
Timestamp int64
CPUTicksConsumed float64
@ -52,32 +54,45 @@ type DiskStats struct {
InodesUsedPercent float64
}
// NodeStatsCollector is an interface which is used for the puproses of mocking
// the HostStatsCollector in the tests
type NodeStatsCollector interface {
Collect() error
Stats() *HostStats
}
// HostStatsCollector collects host resource usage stats
type HostStatsCollector struct {
clkSpeed float64
numCores int
statsCalculator map[string]*HostCpuStatsCalculator
logger *log.Logger
hostStats *HostStats
hostStatsLock sync.RWMutex
allocDir string
}
// NewHostStatsCollector returns a HostStatsCollector
func NewHostStatsCollector(logger *log.Logger) *HostStatsCollector {
// NewHostStatsCollector returns a HostStatsCollector. The allocDir is passed in
// so that we can present the disk related statistics for the mountpoint where
// the allocation directory lives
func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollector {
numCores := runtime.NumCPU()
statsCalculator := make(map[string]*HostCpuStatsCalculator)
collector := &HostStatsCollector{
statsCalculator: statsCalculator,
numCores: numCores,
logger: logger,
allocDir: allocDir,
}
return collector
}
// Collect collects stats related to resource usage of a host
func (h *HostStatsCollector) Collect() (*HostStats, error) {
func (h *HostStatsCollector) Collect() error {
hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()}
memStats, err := mem.VirtualMemory()
if err != nil {
return nil, err
return err
}
hs.Memory = &MemoryStats{
Total: memStats.Total,
@ -89,7 +104,7 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) {
ticksConsumed := 0.0
cpuStats, err := cpu.Times(true)
if err != nil {
return nil, err
return err
}
cs := make([]*CPUStats, len(cpuStats))
for idx, cpuStat := range cpuStats {
@ -113,7 +128,7 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) {
partitions, err := disk.Partitions(false)
if err != nil {
return nil, err
return err
}
var diskStats []*DiskStats
for _, partition := range partitions {
@ -122,32 +137,62 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) {
h.logger.Printf("[WARN] client: error fetching host disk usage stats for %v: %v", partition.Mountpoint, err)
continue
}
ds := DiskStats{
Device: partition.Device,
Mountpoint: partition.Mountpoint,
Size: usage.Total,
Used: usage.Used,
Available: usage.Free,
UsedPercent: usage.UsedPercent,
InodesUsedPercent: usage.InodesUsedPercent,
}
if math.IsNaN(ds.UsedPercent) {
ds.UsedPercent = 0.0
}
if math.IsNaN(ds.InodesUsedPercent) {
ds.InodesUsedPercent = 0.0
}
diskStats = append(diskStats, &ds)
ds := h.toDiskStats(usage, &partition)
diskStats = append(diskStats, ds)
}
hs.DiskStats = diskStats
// Getting the disk stats for the allocation directory
usage, err := disk.Usage(h.allocDir)
if err != nil {
return err
}
hs.AllocDirStats = h.toDiskStats(usage, nil)
uptime, err := host.Uptime()
if err != nil {
return nil, err
return err
}
hs.Uptime = uptime
return hs, nil
h.hostStatsLock.Lock()
defer h.hostStatsLock.Unlock()
h.hostStats = hs
return nil
}
// Stats returns the host stats that has been collected
func (h *HostStatsCollector) Stats() *HostStats {
h.hostStatsLock.RLock()
defer h.hostStatsLock.RUnlock()
return h.hostStats
}
// toDiskStats merges UsageStat and PartitionStat to create a DiskStat
func (h *HostStatsCollector) toDiskStats(usage *disk.UsageStat, partitionStat *disk.PartitionStat) *DiskStats {
if usage == nil {
return nil
}
ds := DiskStats{
Size: usage.Total,
Used: usage.Used,
Available: usage.Free,
UsedPercent: usage.UsedPercent,
InodesUsedPercent: usage.InodesUsedPercent,
}
if math.IsNaN(ds.UsedPercent) {
ds.UsedPercent = 0.0
}
if math.IsNaN(ds.InodesUsedPercent) {
ds.InodesUsedPercent = 0.0
}
if partitionStat != nil {
ds.Device = partitionStat.Device
ds.Mountpoint = partitionStat.Mountpoint
}
return &ds
}
// HostCpuStatsCalculator calculates cpu usage percentages

View File

@ -79,11 +79,24 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
return s.allocStats(allocID, resp, req)
case "snapshot":
return s.allocSnapshot(allocID, resp, req)
case "gc":
return s.allocGC(allocID, resp, req)
}
return nil, CodedError(404, resourceNotFoundErr)
}
func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.agent.client == nil {
return nil, clientNotRunning
}
return nil, s.agent.Client().CollectAllAllocs()
}
func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
return nil, s.agent.Client().CollectAllocation(allocID)
}
func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
allocFS, err := s.agent.Client().GetAllocFS(allocID)
if err != nil {

View File

@ -156,6 +156,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/client/fs/", s.wrap(s.FsRequest))
s.mux.HandleFunc("/v1/client/stats", s.wrap(s.ClientStatsRequest))
s.mux.HandleFunc("/v1/client/allocation/", s.wrap(s.ClientAllocRequest))
s.mux.HandleFunc("/v1/client/gc", s.wrap(s.ClientGCRequest))
s.mux.HandleFunc("/v1/agent/self", s.wrap(s.AgentSelfRequest))
s.mux.HandleFunc("/v1/agent/join", s.wrap(s.AgentJoinRequest))