From 73e9b5790899e64e6ef8c3c0159137f963d2c03e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 26 Oct 2017 14:03:51 -0700 Subject: [PATCH] Trigger GCs after alloc changes GC much more aggressively by triggering GCs when allocations become terminal as well as after new allocations are added. --- client/alloc_runner.go | 6 ++++- client/client.go | 8 ++++++ client/gc.go | 61 ++++++++++++++++++++++++++++++++---------- client/gc_test.go | 19 ++++++++++--- 4 files changed, 75 insertions(+), 19 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 54ed06f98..1a396ac05 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -921,7 +921,6 @@ func (r *AllocRunner) handleDestroy() { // Final state sync. We do this to ensure that the server has the correct // state as we wait for a destroy. alloc := r.Alloc() - r.updater(alloc) // Broadcast and persist state synchronously r.sendBroadcast(alloc) @@ -936,6 +935,11 @@ func (r *AllocRunner) handleDestroy() { r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err) } + // Update the server with the alloc's status -- also marks the alloc as + // being eligible for GC, so from this point on the alloc can be gc'd + // at any time. + r.updater(alloc) + for { select { case <-r.ctx.Done(): diff --git a/client/client.go b/client/client.go index 69a5707b2..290c85815 100644 --- a/client/client.go +++ b/client/client.go @@ -1250,6 +1250,10 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { if ok { c.garbageCollector.MarkForCollection(ar) + + // Trigger a GC in case we're over thresholds and just + // waiting for eligible allocs. + c.garbageCollector.Trigger() } } @@ -1568,6 +1572,10 @@ func (c *Client) runAllocs(update *allocUpdates) { add.ID, err) } } + + // Trigger the GC once more now that new allocs are started that could + // have caused thesholds to be exceeded + c.garbageCollector.Trigger() } // removeAlloc is invoked when we should remove an allocation because it has diff --git a/client/gc.go b/client/gc.go index 6e1508fce..54873e159 100644 --- a/client/gc.go +++ b/client/gc.go @@ -36,13 +36,28 @@ type AllocCounter interface { // AllocGarbageCollector garbage collects terminated allocations on a node type AllocGarbageCollector struct { - allocRunners *IndexedGCAllocPQ + config *GCConfig + + // allocRunners marked for GC + allocRunners *IndexedGCAllocPQ + + // statsCollector for node based thresholds (eg disk) statsCollector stats.NodeStatsCollector - allocCounter AllocCounter - config *GCConfig - logger *log.Logger - destroyCh chan struct{} - shutdownCh chan struct{} + + // allocCounter return the number of un-GC'd allocs on this node + allocCounter AllocCounter + + // destroyCh is a semaphore for rate limiting concurrent garbage + // collections + destroyCh chan struct{} + + // shutdownCh is closed when the GC's run method should exit + shutdownCh chan struct{} + + // triggerCh is ticked by the Trigger method to cause a GC + triggerCh chan struct{} + + logger *log.Logger } // NewAllocGarbageCollector returns a garbage collector for terminated @@ -51,7 +66,7 @@ type AllocGarbageCollector struct { func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector { // Require at least 1 to make progress if config.ParallelDestroys <= 0 { - logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) + logger.Printf("[WARN] client.gc: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys) config.ParallelDestroys = 1 } @@ -63,6 +78,7 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats logger: logger, destroyCh: make(chan struct{}, config.ParallelDestroys), shutdownCh: make(chan struct{}), + triggerCh: make(chan struct{}, 1), } return gc @@ -71,16 +87,28 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats // Run the periodic garbage collector. func (a *AllocGarbageCollector) Run() { ticker := time.NewTicker(a.config.Interval) + a.logger.Printf("[DEBUG] client.gc: GC'ing ever %v", a.config.Interval) for { select { + case <-a.triggerCh: case <-ticker.C: - if err := a.keepUsageBelowThreshold(); err != nil { - a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err) - } case <-a.shutdownCh: ticker.Stop() return } + + if err := a.keepUsageBelowThreshold(); err != nil { + a.logger.Printf("[ERR] client.gc: error garbage collecting allocation: %v", err) + } + } +} + +// Force the garbage collector to run. +func (a *AllocGarbageCollector) Trigger() { + select { + case a.triggerCh <- struct{}{}: + default: + // already triggered } } @@ -116,15 +144,15 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { reason = fmt.Sprintf("number of allocations (%d) is over the limit (%d)", liveAllocs, a.config.MaxAllocs) } - // No reason to gc, exit if reason == "" { + // No reason to gc, exit break } // Collect an allocation gcAlloc := a.allocRunners.Pop() if gcAlloc == nil { - a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason) + a.logger.Printf("[WARN] client.gc: garbage collection due to %s skipped because no terminal allocations", reason) break } @@ -142,7 +170,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin if alloc := ar.Alloc(); alloc != nil { id = alloc.ID } - a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason) + a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason) // Acquire the destroy lock select { @@ -158,7 +186,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin case <-a.shutdownCh: } - a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID) + a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID) // Release the lock <-a.destroyCh @@ -199,6 +227,11 @@ func (a *AllocGarbageCollector) CollectAll() { // MakeRoomFor garbage collects enough number of allocations in the terminal // state to make room for new allocations func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error { + if len(allocations) == 0 { + // Nothing to make room for! + return nil + } + // GC allocs until below the max limit + the new allocations max := a.config.MaxAllocs - len(allocations) for a.allocCounter.NumAllocs() > max { diff --git a/client/gc_test.go b/client/gc_test.go index b09ca3fe5..af39f8096 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -310,8 +310,11 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) { defer client.Shutdown() waitTilNodeReady(client, t) + callN := 0 assertAllocs := func(expectedAll, expectedDestroyed int) { // Wait for allocs to be started + callN++ + client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed) testutil.WaitForResult(func() (bool, error) { all, destroyed := 0, 0 for _, ar := range client.getAllocRunners() { @@ -319,14 +322,19 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) { if ar.IsDestroyed() { destroyed++ } + + // assert is waiting + // 2017/10/26 21:38:01.649166 } return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf( "expected %d allocs (found %d); expected %d destroy (found %d)", expectedAll, all, expectedDestroyed, destroyed, ) }, func(err error) { - t.Fatalf("alloc state: %v", err) + client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed) + t.Fatalf("%d alloc state: %v", callN, err) }) + client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed) } // Create a job @@ -374,15 +382,18 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) { t.Fatalf("error upserting stopped allocs: %v", err) } - // 7 total, 0 GC'd still, but 4 should be marked for GC - assertAllocs(7, 0) + // 7 total, 1 GC'd to get down to limit of 6 + assertAllocs(7, 1) // Add one more alloc if err := state.UpsertAllocs(102, []*structs.Allocation{newAlloc()}); err != nil { t.Fatalf("error upserting new alloc: %v", err) } - // 8 total, 2 GC'd to get down to limit of 6 + // 8 total, 1 GC'd to get down to limit of 6 + // If this fails it may be due to the gc's Run and MarkRoomFor methods + // gc'ing concurrently. May have to disable gc's run loop if this test + // is flaky. assertAllocs(8, 2) // Add new allocs to cause the gc of old terminal ones