Trigger GCs after alloc changes

GC much more aggressively by triggering GCs when allocations become
terminal as well as after new allocations are added.
This commit is contained in:
Michael Schurter 2017-10-26 14:03:51 -07:00 committed by Preetha Appan
parent 2a81160dcd
commit 73e9b57908
4 changed files with 75 additions and 19 deletions

View file

@ -921,7 +921,6 @@ func (r *AllocRunner) handleDestroy() {
// Final state sync. We do this to ensure that the server has the correct
// state as we wait for a destroy.
alloc := r.Alloc()
r.updater(alloc)
// Broadcast and persist state synchronously
r.sendBroadcast(alloc)
@ -936,6 +935,11 @@ func (r *AllocRunner) handleDestroy() {
r.logger.Printf("[ERR] client: alloc %q unable unmount task directories: %v", r.allocID, err)
}
// Update the server with the alloc's status -- also marks the alloc as
// being eligible for GC, so from this point on the alloc can be gc'd
// at any time.
r.updater(alloc)
for {
select {
case <-r.ctx.Done():

View file

@ -1250,6 +1250,10 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
if ok {
c.garbageCollector.MarkForCollection(ar)
// Trigger a GC in case we're over thresholds and just
// waiting for eligible allocs.
c.garbageCollector.Trigger()
}
}
@ -1568,6 +1572,10 @@ func (c *Client) runAllocs(update *allocUpdates) {
add.ID, err)
}
}
// Trigger the GC once more now that new allocs are started that could
// have caused thesholds to be exceeded
c.garbageCollector.Trigger()
}
// removeAlloc is invoked when we should remove an allocation because it has

View file

@ -36,13 +36,28 @@ type AllocCounter interface {
// AllocGarbageCollector garbage collects terminated allocations on a node
type AllocGarbageCollector struct {
allocRunners *IndexedGCAllocPQ
statsCollector stats.NodeStatsCollector
allocCounter AllocCounter
config *GCConfig
logger *log.Logger
// allocRunners marked for GC
allocRunners *IndexedGCAllocPQ
// statsCollector for node based thresholds (eg disk)
statsCollector stats.NodeStatsCollector
// allocCounter return the number of un-GC'd allocs on this node
allocCounter AllocCounter
// destroyCh is a semaphore for rate limiting concurrent garbage
// collections
destroyCh chan struct{}
// shutdownCh is closed when the GC's run method should exit
shutdownCh chan struct{}
// triggerCh is ticked by the Trigger method to cause a GC
triggerCh chan struct{}
logger *log.Logger
}
// NewAllocGarbageCollector returns a garbage collector for terminated
@ -51,7 +66,7 @@ type AllocGarbageCollector struct {
func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStatsCollector, ac AllocCounter, config *GCConfig) *AllocGarbageCollector {
// Require at least 1 to make progress
if config.ParallelDestroys <= 0 {
logger.Printf("[WARN] client: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
logger.Printf("[WARN] client.gc: garbage collector defaulting parallism to 1 due to invalid input value of %d", config.ParallelDestroys)
config.ParallelDestroys = 1
}
@ -63,6 +78,7 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
logger: logger,
destroyCh: make(chan struct{}, config.ParallelDestroys),
shutdownCh: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
}
return gc
@ -71,16 +87,28 @@ func NewAllocGarbageCollector(logger *log.Logger, statsCollector stats.NodeStats
// Run the periodic garbage collector.
func (a *AllocGarbageCollector) Run() {
ticker := time.NewTicker(a.config.Interval)
a.logger.Printf("[DEBUG] client.gc: GC'ing ever %v", a.config.Interval)
for {
select {
case <-a.triggerCh:
case <-ticker.C:
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client: error garbage collecting allocation: %v", err)
}
case <-a.shutdownCh:
ticker.Stop()
return
}
if err := a.keepUsageBelowThreshold(); err != nil {
a.logger.Printf("[ERR] client.gc: error garbage collecting allocation: %v", err)
}
}
}
// Force the garbage collector to run.
func (a *AllocGarbageCollector) Trigger() {
select {
case a.triggerCh <- struct{}{}:
default:
// already triggered
}
}
@ -116,15 +144,15 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error {
reason = fmt.Sprintf("number of allocations (%d) is over the limit (%d)", liveAllocs, a.config.MaxAllocs)
}
// No reason to gc, exit
if reason == "" {
// No reason to gc, exit
break
}
// Collect an allocation
gcAlloc := a.allocRunners.Pop()
if gcAlloc == nil {
a.logger.Printf("[WARN] client: garbage collection due to %s skipped because no terminal allocations", reason)
a.logger.Printf("[WARN] client.gc: garbage collection due to %s skipped because no terminal allocations", reason)
break
}
@ -142,7 +170,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
if alloc := ar.Alloc(); alloc != nil {
id = alloc.ID
}
a.logger.Printf("[INFO] client: garbage collecting allocation %s due to %s", id, reason)
a.logger.Printf("[INFO] client.gc: garbage collecting allocation %s due to %s", id, reason)
// Acquire the destroy lock
select {
@ -158,7 +186,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(ar *AllocRunner, reason strin
case <-a.shutdownCh:
}
a.logger.Printf("[DEBUG] client: garbage collected %q", ar.Alloc().ID)
a.logger.Printf("[DEBUG] client.gc: garbage collected %q", ar.Alloc().ID)
// Release the lock
<-a.destroyCh
@ -199,6 +227,11 @@ func (a *AllocGarbageCollector) CollectAll() {
// MakeRoomFor garbage collects enough number of allocations in the terminal
// state to make room for new allocations
func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) error {
if len(allocations) == 0 {
// Nothing to make room for!
return nil
}
// GC allocs until below the max limit + the new allocations
max := a.config.MaxAllocs - len(allocations)
for a.allocCounter.NumAllocs() > max {

View file

@ -310,8 +310,11 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
defer client.Shutdown()
waitTilNodeReady(client, t)
callN := 0
assertAllocs := func(expectedAll, expectedDestroyed int) {
// Wait for allocs to be started
callN++
client.logger.Printf("[TEST] %d -- Waiting for %d total allocs, %d GC'd", callN, expectedAll, expectedDestroyed)
testutil.WaitForResult(func() (bool, error) {
all, destroyed := 0, 0
for _, ar := range client.getAllocRunners() {
@ -319,14 +322,19 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
if ar.IsDestroyed() {
destroyed++
}
// assert is waiting
// 2017/10/26 21:38:01.649166
}
return all == expectedAll && destroyed == expectedDestroyed, fmt.Errorf(
"expected %d allocs (found %d); expected %d destroy (found %d)",
expectedAll, all, expectedDestroyed, destroyed,
)
}, func(err error) {
t.Fatalf("alloc state: %v", err)
client.logger.Printf("[TEST] %d -- FAILED to find %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
t.Fatalf("%d alloc state: %v", callN, err)
})
client.logger.Printf("[TEST] %d -- Found %d total allocs, %d GC'd!", callN, expectedAll, expectedDestroyed)
}
// Create a job
@ -374,15 +382,18 @@ func TestAllocGarbageCollector_MakeRoomForAllocations_MaxAllocs(t *testing.T) {
t.Fatalf("error upserting stopped allocs: %v", err)
}
// 7 total, 0 GC'd still, but 4 should be marked for GC
assertAllocs(7, 0)
// 7 total, 1 GC'd to get down to limit of 6
assertAllocs(7, 1)
// Add one more alloc
if err := state.UpsertAllocs(102, []*structs.Allocation{newAlloc()}); err != nil {
t.Fatalf("error upserting new alloc: %v", err)
}
// 8 total, 2 GC'd to get down to limit of 6
// 8 total, 1 GC'd to get down to limit of 6
// If this fails it may be due to the gc's Run and MarkRoomFor methods
// gc'ing concurrently. May have to disable gc's run loop if this test
// is flaky.
assertAllocs(8, 2)
// Add new allocs to cause the gc of old terminal ones