From afdaa979f7c7711827f2c502e83d5b1079ada1b2 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sun, 11 Dec 2016 22:33:12 -0800 Subject: [PATCH 1/9] Added a garbage collector for allocations --- client/client.go | 31 ++++++ client/gc.go | 189 ++++++++++++++++++++++++++++++++ client/gc_test.go | 46 ++++++++ command/agent/alloc_endpoint.go | 15 +++ command/agent/http.go | 1 + 5 files changed, 282 insertions(+) create mode 100644 client/gc.go create mode 100644 client/gc_test.go diff --git a/client/client.go b/client/client.go index 4f8c48320..5ef45fea7 100644 --- a/client/client.go +++ b/client/client.go @@ -154,6 +154,10 @@ type Client struct { // migratingAllocs is the set of allocs whose data migration is in flight migratingAllocs map[string]chan struct{} migratingAllocsLock sync.Mutex + + // garbageCollector is used to garbage collect terminal allocations present + // in the node automatically + garbageCollector *AllocGarbageCollector } var ( @@ -191,6 +195,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg servers: newServerList(), triggerDiscoveryCh: make(chan struct{}), serversDiscoveredCh: make(chan struct{}), + garbageCollector: NewAllocGarbageCollector(logger), } // Initialize the client @@ -434,6 +439,23 @@ func (c *Client) Stats() map[string]map[string]string { return stats } +// CollectAllocation garbage collects a single allocation +func (c *Client) CollectAllocation(allocID string) error { + if err := c.garbageCollector.Collect(allocID); err != nil { + return err + } + return nil +} + +// CollectAllAllocs garbage collects all allocations on a node in the terminal +// state +func (c *Client) CollectAllAllocs() error { + if err := c.garbageCollector.CollectAll(); err != nil { + return err + } + return nil +} + // Node returns the locally registered node func (c *Client) Node() *structs.Node { c.configLock.RLock() @@ -1088,6 +1110,15 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { } c.blockedAllocsLock.Unlock() + // Mark the allocation for GC if it is in terminal state + if alloc.Terminated() { + if ar, ok := c.getAllocRunners()[alloc.ID]; ok { + if err := c.garbageCollector.MarkForCollection(ar); err != nil { + c.logger.Printf("[DEBUG] client: couldn't add alloc %v for GC: %v", alloc.ID, err) + } + } + } + // Strip all the information that can be reconstructed at the server. Only // send the fields that are updatable by the client. stripped := new(structs.Allocation) diff --git a/client/gc.go b/client/gc.go new file mode 100644 index 000000000..acb369848 --- /dev/null +++ b/client/gc.go @@ -0,0 +1,189 @@ +package client + +import ( + "container/heap" + "fmt" + "log" + "sync" + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) + +type GCAlloc struct { + timeStamp time.Time + alloc *AllocRunner + index int +} + +type GCAllocPQImpl []*GCAlloc + +func (pq GCAllocPQImpl) Len() int { + return len(pq) +} + +func (pq GCAllocPQImpl) Less(i, j int) bool { + return pq[i].timeStamp.Before(pq[j].timeStamp) +} + +func (pq GCAllocPQImpl) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *GCAllocPQImpl) Push(x interface{}) { + n := len(*pq) + item := x.(*GCAlloc) + item.index = n + *pq = append(*pq, item) +} + +func (pq *GCAllocPQImpl) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// IndexedGCAllocPQ is an indexed PQ which maintains a list of allocation runner +// based on their termination time. +type IndexedGCAllocPQ struct { + index map[string]*GCAlloc + heap GCAllocPQImpl +} + +func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { + return &IndexedGCAllocPQ{ + index: make(map[string]*GCAlloc), + heap: make(GCAllocPQImpl, 0), + } +} + +func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { + alloc := ar.Alloc() + if _, ok := i.index[alloc.ID]; ok { + return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID) + } + gcAlloc := &GCAlloc{ + timeStamp: time.Now(), + alloc: ar, + } + i.index[alloc.ID] = gcAlloc + heap.Push(&i.heap, gcAlloc) + return nil +} + +func (i *IndexedGCAllocPQ) Pop() *GCAlloc { + if len(i.heap) == 0 { + return nil + } + + gcAlloc := heap.Pop(&i.heap).(*GCAlloc) + delete(i.index, gcAlloc.alloc.Alloc().ID) + return gcAlloc +} + +func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { + if gcAlloc, ok := i.index[allocID]; ok { + heap.Remove(&i.heap, gcAlloc.index) + delete(i.index, allocID) + return gcAlloc, nil + } + + return nil, fmt.Errorf("alloc %q not present", allocID) +} + +func (i *IndexedGCAllocPQ) Length() int { + return len(i.heap) +} + +// AllocGarbageCollector garbage collects terminated allocations on a node +type AllocGarbageCollector struct { + allocRunners *IndexedGCAllocPQ + allocsLock sync.Mutex + logger *log.Logger +} + +// NewAllocGarbageCollector returns a garbage collector for terminated +// allocations on a node. +func NewAllocGarbageCollector(logger *log.Logger) *AllocGarbageCollector { + return &AllocGarbageCollector{ + allocRunners: NewIndexedGCAllocPQ(), + logger: logger, + } +} + +// Collect garbage collects a single allocation on a node +func (a *AllocGarbageCollector) Collect(allocID string) error { + gcAlloc, err := a.allocRunners.Remove(allocID) + if err != nil { + return fmt.Errorf("unable to collect allocation %q: %v", allocID, err) + } + + ar := gcAlloc.alloc + a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID) + ar.Destroy() + + return nil +} + +// CollectAll garbage collects all termianated allocations on a node +func (a *AllocGarbageCollector) CollectAll() error { + for { + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + ar := gcAlloc.alloc + a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID) + ar.Destroy() + } + return nil +} + +// MakeRoomFor garbage collects enough number of allocations in the terminal +// state to make room for new allocations +func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { + totalResource := &structs.Resources{} + for _, alloc := range allocations { + if err := totalResource.Add(alloc.Resources); err != nil { + return err + } + } + + var diskCleared int + for { + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + + ar := gcAlloc.alloc + alloc := ar.Alloc() + ar.Destroy() + diskCleared += alloc.Resources.DiskMB + if diskCleared >= totalResource.DiskMB { + break + } + } + return nil +} + +// MarkForCollection starts tracking an allocation for Garbage Collection +func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { + if ar == nil { + return fmt.Errorf("nil allocation runner inserted for garbage collection") + } + if ar.Alloc() == nil { + a.logger.Printf("[INFO] client: alloc is nil, so garbage collecting") + ar.Destroy() + } + + a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) + a.allocsLock.Lock() + defer a.allocsLock.Unlock() + return a.allocRunners.Push(ar) +} diff --git a/client/gc_test.go b/client/gc_test.go new file mode 100644 index 000000000..606fa615b --- /dev/null +++ b/client/gc_test.go @@ -0,0 +1,46 @@ +package client + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/mock" +) + +func TestIndexedGCAllocPQ(t *testing.T) { + pq := NewIndexedGCAllocPQ() + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar3 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar4 := testAllocRunnerFromAlloc(mock.Alloc(), false) + + pq.Push(ar1) + pq.Push(ar2) + pq.Push(ar3) + pq.Push(ar4) + + allocID := pq.Pop().alloc.Alloc().ID + if allocID != ar1.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + allocID = pq.Pop().alloc.Alloc().ID + if allocID != ar2.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + allocID = pq.Pop().alloc.Alloc().ID + if allocID != ar3.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + allocID = pq.Pop().alloc.Alloc().ID + if allocID != ar4.Alloc().ID { + t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) + } + + gcAlloc := pq.Pop() + if gcAlloc != nil { + t.Fatalf("expected nil, got %v", gcAlloc) + } +} diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 19e2f67ed..40a62189b 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -79,11 +79,26 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ return s.allocStats(allocID, resp, req) case "snapshot": return s.allocSnapshot(allocID, resp, req) + case "gc": + return s.allocGC(allocID, resp, req) } return nil, CodedError(404, resourceNotFoundErr) } +func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if s.agent.client == nil { + return nil, clientNotRunning + } + err := s.agent.Client().CollectAllAllocs() + return nil, err +} + +func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + err := s.agent.Client().CollectAllocation(allocID) + return nil, err +} + func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { allocFS, err := s.agent.Client().GetAllocFS(allocID) if err != nil { diff --git a/command/agent/http.go b/command/agent/http.go index 90fbbcdbb..5a31f7d4e 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -156,6 +156,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/client/fs/", s.wrap(s.FsRequest)) s.mux.HandleFunc("/v1/client/stats", s.wrap(s.ClientStatsRequest)) s.mux.HandleFunc("/v1/client/allocation/", s.wrap(s.ClientAllocRequest)) + s.mux.HandleFunc("/v1/client/gc", s.wrap(s.ClientGCRequest)) s.mux.HandleFunc("/v1/agent/self", s.wrap(s.AgentSelfRequest)) s.mux.HandleFunc("/v1/agent/join", s.wrap(s.AgentJoinRequest)) From 0ffd92668d5598c3f30226cd5940f92ab06b97b7 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sun, 11 Dec 2016 22:40:11 -0800 Subject: [PATCH 2/9] GC-ing before we start a new allocation --- client/client.go | 5 +++++ client/gc.go | 18 +++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/client/client.go b/client/client.go index 5ef45fea7..ffdfc9df9 100644 --- a/client/client.go +++ b/client/client.go @@ -1792,6 +1792,11 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo return nil } + // Make room for the allocation + if err := c.garbageCollector.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + c.logger.Printf("[ERR] client: error making room for allocation: %v", err) + } + c.configLock.RLock() ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient) ar.SetPreviousAllocDir(prevAllocDir) diff --git a/client/gc.go b/client/gc.go index acb369848..2c2120832 100644 --- a/client/gc.go +++ b/client/gc.go @@ -11,9 +11,9 @@ import ( ) type GCAlloc struct { - timeStamp time.Time - alloc *AllocRunner - index int + timeStamp time.Time + allocRunner *AllocRunner + index int } type GCAllocPQImpl []*GCAlloc @@ -68,8 +68,8 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID) } gcAlloc := &GCAlloc{ - timeStamp: time.Now(), - alloc: ar, + timeStamp: time.Now(), + allocRunner: ar, } i.index[alloc.ID] = gcAlloc heap.Push(&i.heap, gcAlloc) @@ -82,7 +82,7 @@ func (i *IndexedGCAllocPQ) Pop() *GCAlloc { } gcAlloc := heap.Pop(&i.heap).(*GCAlloc) - delete(i.index, gcAlloc.alloc.Alloc().ID) + delete(i.index, gcAlloc.allocRunner.Alloc().ID) return gcAlloc } @@ -123,7 +123,7 @@ func (a *AllocGarbageCollector) Collect(allocID string) error { return fmt.Errorf("unable to collect allocation %q: %v", allocID, err) } - ar := gcAlloc.alloc + ar := gcAlloc.allocRunner a.logger.Printf("[INFO] client: garbage collecting allocation %q", ar.Alloc().ID) ar.Destroy() @@ -137,7 +137,7 @@ func (a *AllocGarbageCollector) CollectAll() error { if gcAlloc == nil { break } - ar := gcAlloc.alloc + ar := gcAlloc.allocRunner a.logger.Printf("[INFO] client: garbage collecting alloc runner for alloc %q", ar.Alloc().ID) ar.Destroy() } @@ -161,7 +161,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e break } - ar := gcAlloc.alloc + ar := gcAlloc.allocRunner alloc := ar.Alloc() ar.Destroy() diskCleared += alloc.Resources.DiskMB From e855cd587b7dd480a0fad45aec39b22d0e7e4dba Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sun, 11 Dec 2016 22:58:28 -0800 Subject: [PATCH 3/9] Refactored hoststats collector --- client/client.go | 14 +++----------- client/gc.go | 1 + client/stats/host.go | 24 ++++++++++++++++++------ 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/client/client.go b/client/client.go index ffdfc9df9..ba616f2b8 100644 --- a/client/client.go +++ b/client/client.go @@ -141,8 +141,6 @@ type Client struct { // HostStatsCollector collects host resource usage stats hostStatsCollector *stats.HostStatsCollector - resourceUsage *stats.HostStats - resourceUsageLock sync.RWMutex shutdown bool shutdownCh chan struct{} @@ -481,9 +479,7 @@ func (c *Client) GetAllocStats(allocID string) (AllocStatsReporter, error) { // HostStats returns all the stats related to a Nomad client func (c *Client) LatestHostStats() *stats.HostStats { - c.resourceUsageLock.RLock() - defer c.resourceUsageLock.RUnlock() - return c.resourceUsage + return c.hostStatsCollector.Stats() } // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation @@ -2104,20 +2100,16 @@ func (c *Client) collectHostStats() { for { select { case <-next.C: - ru, err := c.hostStatsCollector.Collect() + err := c.hostStatsCollector.Collect() next.Reset(c.config.StatsCollectionInterval) if err != nil { c.logger.Printf("[WARN] client: error fetching host resource usage stats: %v", err) continue } - c.resourceUsageLock.Lock() - c.resourceUsage = ru - c.resourceUsageLock.Unlock() - // Publish Node metrics if operator has opted in if c.config.PublishNodeMetrics { - c.emitStats(ru) + c.emitStats(c.hostStatsCollector.Stats()) } case <-c.shutdownCh: return diff --git a/client/gc.go b/client/gc.go index 2c2120832..eb66ab170 100644 --- a/client/gc.go +++ b/client/gc.go @@ -163,6 +163,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e ar := gcAlloc.allocRunner alloc := ar.Alloc() + a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) ar.Destroy() diskCleared += alloc.Resources.DiskMB if diskCleared >= totalResource.DiskMB { diff --git a/client/stats/host.go b/client/stats/host.go index 49fb0fdcb..d1cd6ac8d 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -4,6 +4,7 @@ import ( "log" "math" "runtime" + "sync" "time" "github.com/shirou/gopsutil/cpu" @@ -58,6 +59,8 @@ type HostStatsCollector struct { numCores int statsCalculator map[string]*HostCpuStatsCalculator logger *log.Logger + hostStats *HostStats + hostStatsLock sync.RWMutex } // NewHostStatsCollector returns a HostStatsCollector @@ -73,11 +76,11 @@ func NewHostStatsCollector(logger *log.Logger) *HostStatsCollector { } // Collect collects stats related to resource usage of a host -func (h *HostStatsCollector) Collect() (*HostStats, error) { +func (h *HostStatsCollector) Collect() error { hs := &HostStats{Timestamp: time.Now().UTC().UnixNano()} memStats, err := mem.VirtualMemory() if err != nil { - return nil, err + return err } hs.Memory = &MemoryStats{ Total: memStats.Total, @@ -89,7 +92,7 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) { ticksConsumed := 0.0 cpuStats, err := cpu.Times(true) if err != nil { - return nil, err + return err } cs := make([]*CPUStats, len(cpuStats)) for idx, cpuStat := range cpuStats { @@ -113,7 +116,7 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) { partitions, err := disk.Partitions(false) if err != nil { - return nil, err + return err } var diskStats []*DiskStats for _, partition := range partitions { @@ -143,11 +146,20 @@ func (h *HostStatsCollector) Collect() (*HostStats, error) { uptime, err := host.Uptime() if err != nil { - return nil, err + return err } hs.Uptime = uptime - return hs, nil + h.hostStatsLock.Lock() + defer h.hostStatsLock.Unlock() + h.hostStats = hs + return nil +} + +func (h *HostStatsCollector) Stats() *HostStats { + h.hostStatsLock.RLock() + defer h.hostStatsLock.RUnlock() + return h.hostStats } // HostCpuStatsCalculator calculates cpu usage percentages From 7aef9bcabeb2f399449c97aaece49584d964e965 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Sun, 11 Dec 2016 23:08:13 -0800 Subject: [PATCH 4/9] Added the stats collector to GC --- client/client.go | 6 ++++-- client/gc.go | 15 +++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index ba616f2b8..596a319dc 100644 --- a/client/client.go +++ b/client/client.go @@ -177,6 +177,8 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg tlsWrap = tw } + statsCollector := stats.NewHostStatsCollector(logger) + // Create the client c := &Client{ config: cfg, @@ -184,7 +186,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg start: time.Now(), connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), logger: logger, - hostStatsCollector: stats.NewHostStatsCollector(logger), + hostStatsCollector: statsCollector, allocs: make(map[string]*AllocRunner), blockedAllocations: make(map[string]*structs.Allocation), allocUpdates: make(chan *structs.Allocation, 64), @@ -193,7 +195,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg servers: newServerList(), triggerDiscoveryCh: make(chan struct{}), serversDiscoveredCh: make(chan struct{}), - garbageCollector: NewAllocGarbageCollector(logger), + garbageCollector: NewAllocGarbageCollector(logger, statsCollector), } // Initialize the client diff --git a/client/gc.go b/client/gc.go index eb66ab170..9d4a2c8e2 100644 --- a/client/gc.go +++ b/client/gc.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" ) @@ -102,17 +103,19 @@ func (i *IndexedGCAllocPQ) Length() int { // AllocGarbageCollector garbage collects terminated allocations on a node type AllocGarbageCollector struct { - allocRunners *IndexedGCAllocPQ - allocsLock sync.Mutex - logger *log.Logger + allocRunners *IndexedGCAllocPQ + allocsLock sync.Mutex + statsCollector *stats.HostStatsCollector + logger *log.Logger } // NewAllocGarbageCollector returns a garbage collector for terminated // allocations on a node. -func NewAllocGarbageCollector(logger *log.Logger) *AllocGarbageCollector { +func NewAllocGarbageCollector(logger *log.Logger, statsCollector *stats.HostStatsCollector) *AllocGarbageCollector { return &AllocGarbageCollector{ - allocRunners: NewIndexedGCAllocPQ(), - logger: logger, + allocRunners: NewIndexedGCAllocPQ(), + statsCollector: statsCollector, + logger: logger, } } From 36b5545d6bc53bc34baee157e3c69d2c80fcac62 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 15 Dec 2016 23:54:54 -0800 Subject: [PATCH 5/9] Making the gc allocator understand real disk usage --- client/client.go | 12 ++-- client/gc.go | 133 ++++++++++++++++++++++++++++++++++++++++--- client/gc_test.go | 8 +-- client/stats/host.go | 62 ++++++++++++++------ 4 files changed, 181 insertions(+), 34 deletions(-) diff --git a/client/client.go b/client/client.go index 596a319dc..2a3aade7d 100644 --- a/client/client.go +++ b/client/client.go @@ -177,8 +177,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg tlsWrap = tw } - statsCollector := stats.NewHostStatsCollector(logger) - // Create the client c := &Client{ config: cfg, @@ -186,7 +184,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg start: time.Now(), connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), logger: logger, - hostStatsCollector: statsCollector, allocs: make(map[string]*AllocRunner), blockedAllocations: make(map[string]*structs.Allocation), allocUpdates: make(chan *structs.Allocation, 64), @@ -195,7 +192,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg servers: newServerList(), triggerDiscoveryCh: make(chan struct{}), serversDiscoveredCh: make(chan struct{}), - garbageCollector: NewAllocGarbageCollector(logger, statsCollector), } // Initialize the client @@ -203,6 +199,11 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg return nil, fmt.Errorf("failed to initialize client: %v", err) } + // Add the stats collector and the garbage collector + statsCollector := stats.NewHostStatsCollector(logger, c.config.AllocDir) + c.hostStatsCollector = statsCollector + c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, cfg.Node.Reserved.DiskMB) + // Setup the node if err := c.setupNode(); err != nil { return nil, fmt.Errorf("node setup failed: %v", err) @@ -372,6 +373,9 @@ func (c *Client) Shutdown() error { c.vaultClient.Stop() } + // Stop Garbage collector + c.garbageCollector.Stop() + // Destroy all the running allocations. if c.config.DevMode { c.allocLock.Lock() diff --git a/client/gc.go b/client/gc.go index 9d4a2c8e2..2276b56c7 100644 --- a/client/gc.go +++ b/client/gc.go @@ -11,6 +11,20 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // diskUsageThreshold is the percent of used disk space beyond which Nomad + // GCs terminated allocations + diskUsageThreshold = 80 + + // gcInterval is the interval at which Nomad runs the garbage collector + gcInterval = 1 * time.Minute + + // inodeUsageThreshold is the percent of inode usage that Nomad tries to + // maintain, whenever we are over it we will attempt to GC terminal + // allocations + inodeUsageThreshold = 70 +) + type GCAlloc struct { timeStamp time.Time allocRunner *AllocRunner @@ -54,6 +68,8 @@ func (pq *GCAllocPQImpl) Pop() interface{} { type IndexedGCAllocPQ struct { index map[string]*GCAlloc heap GCAllocPQImpl + + pqLock sync.Mutex } func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { @@ -64,6 +80,9 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { } func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { + i.pqLock.Lock() + defer i.pqLock.Unlock() + alloc := ar.Alloc() if _, ok := i.index[alloc.ID]; ok { return fmt.Errorf("alloc %v already being tracked for GC", alloc.ID) @@ -78,6 +97,9 @@ func (i *IndexedGCAllocPQ) Push(ar *AllocRunner) error { } func (i *IndexedGCAllocPQ) Pop() *GCAlloc { + i.pqLock.Lock() + defer i.pqLock.Unlock() + if len(i.heap) == 0 { return nil } @@ -88,6 +110,9 @@ func (i *IndexedGCAllocPQ) Pop() *GCAlloc { } func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { + i.pqLock.Lock() + defer i.pqLock.Unlock() + if gcAlloc, ok := i.index[allocID]; ok { heap.Remove(&i.heap, gcAlloc.index) delete(i.index, allocID) @@ -98,25 +123,90 @@ func (i *IndexedGCAllocPQ) Remove(allocID string) (*GCAlloc, error) { } func (i *IndexedGCAllocPQ) Length() int { + i.pqLock.Lock() + defer i.pqLock.Unlock() + return len(i.heap) } // AllocGarbageCollector garbage collects terminated allocations on a node type AllocGarbageCollector struct { allocRunners *IndexedGCAllocPQ - allocsLock sync.Mutex - statsCollector *stats.HostStatsCollector + statsCollector stats.NodeStatsCollector + reservedDiskMB int logger *log.Logger + shutdownCh chan struct{} } // NewAllocGarbageCollector returns a garbage collector for terminated // allocations on a node. -func NewAllocGarbageCollector(logger *log.Logger, statsCollector *stats.HostStatsCollector) *AllocGarbageCollector { - return &AllocGarbageCollector{ +func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, reservedDiskMB int) *AllocGarbageCollector { + gc := &AllocGarbageCollector{ allocRunners: NewIndexedGCAllocPQ(), statsCollector: statsCollector, + reservedDiskMB: reservedDiskMB, logger: logger, + shutdownCh: make(chan struct{}), } + go gc.run() + + return gc +} + +func (a *AllocGarbageCollector) run() { + ticker := time.NewTicker(gcInterval) + for { + select { + case <-ticker.C: + if err := a.keepUsageBelowThreshold(); err != nil { + a.logger.Printf("[ERR] client: error GCing allocation: %v", err) + } + case <-a.shutdownCh: + ticker.Stop() + return + } + } +} + +// keepUsageBelowThreshold collects disk usage information and garbage collects +// allocations to make disk space available. +func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { + for { + // Check if we have enough free space + err := a.statsCollector.Collect() + if err != nil { + return err + } + + // See if we are below thresholds for used disk space and inode usage + diskStats := a.statsCollector.Stats().AllocDirStats + if diskStats.UsedPercent <= diskUsageThreshold && + diskStats.InodesUsedPercent <= inodeUsageThreshold { + break + } + + // Collect an allocation + gcAlloc := a.allocRunners.Pop() + if gcAlloc == nil { + break + } + + ar := gcAlloc.allocRunner + alloc := ar.Alloc() + a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) + + // Destroy the alloc runner and wait until it exits + ar.Destroy() + select { + case <-ar.WaitCh(): + case <-a.shutdownCh: + } + } + return nil +} + +func (a *AllocGarbageCollector) Stop() { + a.shutdownCh <- struct{}{} } // Collect garbage collects a single allocation on a node @@ -157,8 +247,30 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e } } + // If the host has enough free space to accomodate the new allocations then + // we don't need to garbage collect terminated allocations + hostStats := a.statsCollector.Stats() + if hostStats != nil && uint64(totalResource.DiskMB*1024*1024) < hostStats.AllocDirStats.Available { + return nil + } + var diskCleared int for { + // Collect host stats and see if we still need to remove older + // allocations + if err := a.statsCollector.Collect(); err == nil { + hostStats := a.statsCollector.Stats() + if hostStats.AllocDirStats.Available >= uint64(totalResource.DiskMB*1024*1024) { + break + } + } else { + // Falling back to a simpler model to know if we have enough disk + // space if stats collection fails + if diskCleared >= totalResource.DiskMB { + break + } + } + gcAlloc := a.allocRunners.Pop() if gcAlloc == nil { break @@ -167,11 +279,16 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e ar := gcAlloc.allocRunner alloc := ar.Alloc() a.logger.Printf("[INFO] client: garbage collecting allocation %v", alloc.ID) + + // Destroy the alloc runner and wait until it exits ar.Destroy() - diskCleared += alloc.Resources.DiskMB - if diskCleared >= totalResource.DiskMB { - break + select { + case <-ar.WaitCh(): + case <-a.shutdownCh: } + + // Call stats collect again + diskCleared += alloc.Resources.DiskMB } return nil } @@ -187,7 +304,5 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { } a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) - a.allocsLock.Lock() - defer a.allocsLock.Unlock() return a.allocRunners.Push(ar) } diff --git a/client/gc_test.go b/client/gc_test.go index 606fa615b..c85b0ccd2 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -19,22 +19,22 @@ func TestIndexedGCAllocPQ(t *testing.T) { pq.Push(ar3) pq.Push(ar4) - allocID := pq.Pop().alloc.Alloc().ID + allocID := pq.Pop().allocRunner.Alloc().ID if allocID != ar1.Alloc().ID { t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) } - allocID = pq.Pop().alloc.Alloc().ID + allocID = pq.Pop().allocRunner.Alloc().ID if allocID != ar2.Alloc().ID { t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) } - allocID = pq.Pop().alloc.Alloc().ID + allocID = pq.Pop().allocRunner.Alloc().ID if allocID != ar3.Alloc().ID { t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) } - allocID = pq.Pop().alloc.Alloc().ID + allocID = pq.Pop().allocRunner.Alloc().ID if allocID != ar4.Alloc().ID { t.Fatalf("expected alloc %v, got %v", allocID, ar1.Alloc().ID) } diff --git a/client/stats/host.go b/client/stats/host.go index d1cd6ac8d..1583c3027 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -20,6 +20,7 @@ type HostStats struct { Memory *MemoryStats CPU []*CPUStats DiskStats []*DiskStats + AllocDirStats *DiskStats Uptime uint64 Timestamp int64 CPUTicksConsumed float64 @@ -53,6 +54,11 @@ type DiskStats struct { InodesUsedPercent float64 } +type NodeStatsCollector interface { + Collect() error + Stats() *HostStats +} + // HostStatsCollector collects host resource usage stats type HostStatsCollector struct { clkSpeed float64 @@ -61,16 +67,18 @@ type HostStatsCollector struct { logger *log.Logger hostStats *HostStats hostStatsLock sync.RWMutex + allocDir string } // NewHostStatsCollector returns a HostStatsCollector -func NewHostStatsCollector(logger *log.Logger) *HostStatsCollector { +func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollector { numCores := runtime.NumCPU() statsCalculator := make(map[string]*HostCpuStatsCalculator) collector := &HostStatsCollector{ statsCalculator: statsCalculator, numCores: numCores, logger: logger, + allocDir: allocDir, } return collector } @@ -125,25 +133,17 @@ func (h *HostStatsCollector) Collect() error { h.logger.Printf("[WARN] client: error fetching host disk usage stats for %v: %v", partition.Mountpoint, err) continue } - ds := DiskStats{ - Device: partition.Device, - Mountpoint: partition.Mountpoint, - Size: usage.Total, - Used: usage.Used, - Available: usage.Free, - UsedPercent: usage.UsedPercent, - InodesUsedPercent: usage.InodesUsedPercent, - } - if math.IsNaN(ds.UsedPercent) { - ds.UsedPercent = 0.0 - } - if math.IsNaN(ds.InodesUsedPercent) { - ds.InodesUsedPercent = 0.0 - } - diskStats = append(diskStats, &ds) + ds := h.toDiskStats(usage, &partition) + diskStats = append(diskStats, ds) } hs.DiskStats = diskStats + usage, err := disk.Usage(h.allocDir) + if err != nil { + return err + } + hs.AllocDirStats = h.toDiskStats(usage, nil) + uptime, err := host.Uptime() if err != nil { return err @@ -156,12 +156,40 @@ func (h *HostStatsCollector) Collect() error { return nil } +// Stats returns the host stats that has been collected func (h *HostStatsCollector) Stats() *HostStats { h.hostStatsLock.RLock() defer h.hostStatsLock.RUnlock() return h.hostStats } +// toDiskStats merges UsageStat and PartitionStat to create a DiskStat +func (h *HostStatsCollector) toDiskStats(usage *disk.UsageStat, partitionStat *disk.PartitionStat) *DiskStats { + if usage == nil { + return nil + } + ds := DiskStats{ + Size: usage.Total, + Used: usage.Used, + Available: usage.Free, + UsedPercent: usage.UsedPercent, + InodesUsedPercent: usage.InodesUsedPercent, + } + if math.IsNaN(ds.UsedPercent) { + ds.UsedPercent = 0.0 + } + if math.IsNaN(ds.InodesUsedPercent) { + ds.InodesUsedPercent = 0.0 + } + + if partitionStat != nil { + ds.Device = partitionStat.Device + ds.Mountpoint = partitionStat.Mountpoint + } + + return &ds +} + // HostCpuStatsCalculator calculates cpu usage percentages type HostCpuStatsCalculator struct { prevIdle float64 From e072961ceac5c44a65438cbdca10e5573c7ad4b4 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 19 Dec 2016 10:51:57 -0800 Subject: [PATCH 6/9] Added tests --- client/gc.go | 31 +++++- client/gc_test.go | 279 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 305 insertions(+), 5 deletions(-) diff --git a/client/gc.go b/client/gc.go index 2276b56c7..190e7bbff 100644 --- a/client/gc.go +++ b/client/gc.go @@ -23,6 +23,9 @@ const ( // maintain, whenever we are over it we will attempt to GC terminal // allocations inodeUsageThreshold = 70 + + // MB is a constant which converts values in bytes to MB + MB = 1024 * 1024 ) type GCAlloc struct { @@ -180,6 +183,11 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { // See if we are below thresholds for used disk space and inode usage diskStats := a.statsCollector.Stats().AllocDirStats + + if diskStats == nil { + break + } + if diskStats.UsedPercent <= diskUsageThreshold && diskStats.InodesUsedPercent <= inodeUsageThreshold { break @@ -249,18 +257,31 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e // If the host has enough free space to accomodate the new allocations then // we don't need to garbage collect terminated allocations - hostStats := a.statsCollector.Stats() - if hostStats != nil && uint64(totalResource.DiskMB*1024*1024) < hostStats.AllocDirStats.Available { - return nil + if hostStats := a.statsCollector.Stats(); hostStats != nil { + var availableForAllocations uint64 + if hostStats.AllocDirStats.Available < uint64(a.reservedDiskMB*MB) { + availableForAllocations = 0 + } else { + availableForAllocations = hostStats.AllocDirStats.Available - uint64(a.reservedDiskMB*MB) + } + if uint64(totalResource.DiskMB*MB) < availableForAllocations { + return nil + } } var diskCleared int for { // Collect host stats and see if we still need to remove older // allocations + var allocDirStats *stats.DiskStats if err := a.statsCollector.Collect(); err == nil { - hostStats := a.statsCollector.Stats() - if hostStats.AllocDirStats.Available >= uint64(totalResource.DiskMB*1024*1024) { + if hostStats := a.statsCollector.Stats(); hostStats != nil { + allocDirStats = hostStats.AllocDirStats + } + } + + if allocDirStats != nil { + if allocDirStats.Available >= uint64(totalResource.DiskMB*1024*1024) { break } } else { diff --git a/client/gc_test.go b/client/gc_test.go index c85b0ccd2..5e3572023 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -1,9 +1,13 @@ package client import ( + "log" + "os" "testing" + "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" ) func TestIndexedGCAllocPQ(t *testing.T) { @@ -44,3 +48,278 @@ func TestIndexedGCAllocPQ(t *testing.T) { t.Fatalf("expected nil, got %v", gcAlloc) } } + +type MockStatsCollector struct { + availableValues []uint64 + usedPercents []float64 + inodePercents []float64 + index int +} + +func (m *MockStatsCollector) Collect() error { + return nil +} + +func (m *MockStatsCollector) Stats() *stats.HostStats { + if len(m.availableValues) == 0 { + return nil + } + + available := m.availableValues[m.index] + usedPercent := m.usedPercents[m.index] + inodePercent := m.inodePercents[m.index] + + if m.index < len(m.availableValues)-1 { + m.index = m.index + 1 + } + return &stats.HostStats{ + AllocDirStats: &stats.DiskStats{ + Available: available, + UsedPercent: usedPercent, + InodesUsedPercent: inodePercent, + }, + } +} + +func TestAllocGarbageCollector_MarkForCollection(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + + gcAlloc := gc.allocRunners.Pop() + if gcAlloc == nil || gcAlloc.allocRunner != ar1 { + t.Fatalf("bad gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_Collect(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + if err := gc.Collect(ar1.Alloc().ID); err != nil { + t.Fatalf("err: %v", err) + } + gcAlloc := gc.allocRunners.Pop() + if gcAlloc == nil || gcAlloc.allocRunner != ar2 { + t.Fatalf("bad gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_CollectAll(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + gc := NewAllocGarbageCollector(logger, &MockStatsCollector{}, 0) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + if err := gc.CollectAll(); err != nil { + t.Fatalf("err: %v", err) + } + gcAlloc := gc.allocRunners.Pop() + if gcAlloc != nil { + t.Fatalf("bad gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + // Make stats collector report 200MB free out of which 20MB is reserved + statsCollector.availableValues = []uint64{200 * MB} + statsCollector.usedPercents = []float64{0} + statsCollector.inodePercents = []float64{0} + + alloc := mock.Alloc() + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // When we have enough disk available and don't need to do any GC so we + // should have two ARs in the GC queue + for i := 0; i < 2; i++ { + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + // Make stats collector report 80MB and 175MB free in subsequent calls + statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 175 * MB} + statsCollector.usedPercents = []float64{0, 0, 0} + statsCollector.inodePercents = []float64{0, 0, 0} + + alloc := mock.Alloc() + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // We should be GC-ing one alloc + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + // Make stats collector report 80MB and 95MB free in subsequent calls + statsCollector.availableValues = []uint64{80 * MB, 80 * MB, 95 * MB} + statsCollector.usedPercents = []float64{0, 0, 0} + statsCollector.inodePercents = []float64{0, 0, 0} + + alloc := mock.Alloc() + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // We should be GC-ing all the alloc runners + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + alloc := mock.Alloc() + if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + + // We should be GC-ing one alloc + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } +} + +func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + statsCollector.availableValues = []uint64{1000} + statsCollector.usedPercents = []float64{20} + statsCollector.inodePercents = []float64{10} + + if err := gc.keepUsageBelowThreshold(); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { + logger := log.New(os.Stdout, "", 0) + statsCollector := &MockStatsCollector{} + gc := NewAllocGarbageCollector(logger, statsCollector, 20) + + _, ar1 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar1.waitCh) + _, ar2 := testAllocRunnerFromAlloc(mock.Alloc(), false) + close(ar2.waitCh) + if err := gc.MarkForCollection(ar1); err != nil { + t.Fatalf("err: %v", err) + } + if err := gc.MarkForCollection(ar2); err != nil { + t.Fatalf("err: %v", err) + } + + statsCollector.availableValues = []uint64{1000, 800} + statsCollector.usedPercents = []float64{85, 60} + statsCollector.inodePercents = []float64{50, 30} + + if err := gc.keepUsageBelowThreshold(); err != nil { + t.Fatalf("err: %v", err) + } +} From 6e6e0d364ad9053c9f1c45765ee7cb6f925de160 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 19 Dec 2016 17:53:11 -0800 Subject: [PATCH 7/9] Added comments --- client/client.go | 10 ++-------- client/gc.go | 8 +++++--- client/gc_test.go | 22 ++++++++++++++++++++++ client/stats/host.go | 7 ++++++- command/agent/alloc_endpoint.go | 6 ++---- 5 files changed, 37 insertions(+), 16 deletions(-) diff --git a/client/client.go b/client/client.go index 2a3aade7d..80c3e81a3 100644 --- a/client/client.go +++ b/client/client.go @@ -445,19 +445,13 @@ func (c *Client) Stats() map[string]map[string]string { // CollectAllocation garbage collects a single allocation func (c *Client) CollectAllocation(allocID string) error { - if err := c.garbageCollector.Collect(allocID); err != nil { - return err - } - return nil + return c.garbageCollector.Collect(allocID) } // CollectAllAllocs garbage collects all allocations on a node in the terminal // state func (c *Client) CollectAllAllocs() error { - if err := c.garbageCollector.CollectAll(); err != nil { - return err - } - return nil + return c.garbageCollector.CollectAll() } // Node returns the locally registered node diff --git a/client/gc.go b/client/gc.go index 190e7bbff..9d9c4b43a 100644 --- a/client/gc.go +++ b/client/gc.go @@ -28,6 +28,8 @@ const ( MB = 1024 * 1024 ) +// GCAlloc wraps an allocation runner and an index enabling it to be used within +// a PQ type GCAlloc struct { timeStamp time.Time allocRunner *AllocRunner @@ -162,7 +164,7 @@ func (a *AllocGarbageCollector) run() { select { case <-ticker.C: if err := a.keepUsageBelowThreshold(); err != nil { - a.logger.Printf("[ERR] client: error GCing allocation: %v", err) + a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err) } case <-a.shutdownCh: ticker.Stop() @@ -214,7 +216,7 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { } func (a *AllocGarbageCollector) Stop() { - a.shutdownCh <- struct{}{} + close(a.shutdownCh) } // Collect garbage collects a single allocation on a node @@ -281,7 +283,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e } if allocDirStats != nil { - if allocDirStats.Available >= uint64(totalResource.DiskMB*1024*1024) { + if allocDirStats.Available >= uint64(totalResource.DiskMB*MB) { break } } else { diff --git a/client/gc_test.go b/client/gc_test.go index 5e3572023..8ae97a3cc 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -162,6 +162,7 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_EnoughSpace(t *testing.T) statsCollector.inodePercents = []float64{0} alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -197,6 +198,7 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Partial(t *testing.T) { statsCollector.inodePercents = []float64{0, 0, 0} alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -233,6 +235,7 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_All(t *testing.T) { statsCollector.inodePercents = []float64{0, 0, 0} alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -260,6 +263,7 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_GC_Fallback(t *testing.T) } alloc := mock.Alloc() + alloc.Resources.DiskMB = 150 if err := gc.MakeRoomFor([]*structs.Allocation{alloc}); err != nil { t.Fatalf("err: %v", err) } @@ -297,6 +301,14 @@ func TestAllocGarbageCollector_UsageBelowThreshold(t *testing.T) { if err := gc.keepUsageBelowThreshold(); err != nil { t.Fatalf("err: %v", err) } + + // We shouldn't GC any of the allocs since the used percent values are below + // threshold + for i := 0; i < 2; i++ { + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + } } func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { @@ -322,4 +334,14 @@ func TestAllocGarbageCollector_UsedPercentThreshold(t *testing.T) { if err := gc.keepUsageBelowThreshold(); err != nil { t.Fatalf("err: %v", err) } + + // We should be GC-ing only one of the alloc runners since the second time + // used percent returns a number below threshold. + if gcAlloc := gc.allocRunners.Pop(); gcAlloc == nil { + t.Fatalf("err: %v", gcAlloc) + } + + if gcAlloc := gc.allocRunners.Pop(); gcAlloc != nil { + t.Fatalf("gcAlloc: %v", gcAlloc) + } } diff --git a/client/stats/host.go b/client/stats/host.go index 1583c3027..95f67c944 100644 --- a/client/stats/host.go +++ b/client/stats/host.go @@ -54,6 +54,8 @@ type DiskStats struct { InodesUsedPercent float64 } +// NodeStatsCollector is an interface which is used for the puproses of mocking +// the HostStatsCollector in the tests type NodeStatsCollector interface { Collect() error Stats() *HostStats @@ -70,7 +72,9 @@ type HostStatsCollector struct { allocDir string } -// NewHostStatsCollector returns a HostStatsCollector +// NewHostStatsCollector returns a HostStatsCollector. The allocDir is passed in +// so that we can present the disk related statistics for the mountpoint where +// the allocation directory lives func NewHostStatsCollector(logger *log.Logger, allocDir string) *HostStatsCollector { numCores := runtime.NumCPU() statsCalculator := make(map[string]*HostCpuStatsCalculator) @@ -138,6 +142,7 @@ func (h *HostStatsCollector) Collect() error { } hs.DiskStats = diskStats + // Getting the disk stats for the allocation directory usage, err := disk.Usage(h.allocDir) if err != nil { return err diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 40a62189b..3afdfebad 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -90,13 +90,11 @@ func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request if s.agent.client == nil { return nil, clientNotRunning } - err := s.agent.Client().CollectAllAllocs() - return nil, err + return nil, s.agent.Client().CollectAllAllocs() } func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { - err := s.agent.Client().CollectAllocation(allocID) - return nil, err + return nil, s.agent.Client().CollectAllocation(allocID) } func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { From b6120e2fc8c24a1ec99a705f1f67a831b3cc7605 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 20 Dec 2016 11:14:22 -0800 Subject: [PATCH 8/9] Removing the alloc runner from GC if it is destroyed by the server --- client/client.go | 3 +++ client/gc.go | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/client/client.go b/client/client.go index 80c3e81a3..18fbb4ae8 100644 --- a/client/client.go +++ b/client/client.go @@ -1759,6 +1759,9 @@ func (c *Client) removeAlloc(alloc *structs.Allocation) error { delete(c.allocs, alloc.ID) c.allocLock.Unlock() + // Remove the allocrunner from garbage collector + c.garbageCollector.Remove(ar) + ar.Destroy() return nil } diff --git a/client/gc.go b/client/gc.go index 9d9c4b43a..0e41d0950 100644 --- a/client/gc.go +++ b/client/gc.go @@ -329,3 +329,15 @@ func (a *AllocGarbageCollector) MarkForCollection(ar *AllocRunner) error { a.logger.Printf("[INFO] client: marking allocation %v for GC", ar.Alloc().ID) return a.allocRunners.Push(ar) } + +// Remove removes an alloc runner without garbage collecting it +func (a *AllocGarbageCollector) Remove(ar *AllocRunner) { + if ar == nil || ar.Alloc() == nil { + return + } + + alloc := ar.Alloc() + if _, err := a.allocRunners.Remove(alloc.ID); err == nil { + a.logger.Printf("[INFO] client: removed alloc runner %v from garbage collector", alloc.ID) + } +} From 0b9b6003c2742e6e47c8d2a4f1eaf51e2138c33f Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 20 Dec 2016 11:15:47 -0800 Subject: [PATCH 9/9] Updated changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd27f461f..9f689b19a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.5.2 (Unreleased) +IMPROVEMENTS: + * client: Garbage collect Allocation Runners to free up disk resouces + [GH-2081] + BUG FIXES: * client: Fixed a race condition and remove panic when handling duplicate allocations [GH-2096]