open-nomad/nomad/drainer/drain_heap.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

164 lines
3.7 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
2018-03-08 23:08:23 +00:00
package drainer
2018-03-02 00:37:19 +00:00
import (
2018-03-02 23:19:55 +00:00
"context"
"sync"
2018-03-02 00:37:19 +00:00
"time"
)
2018-03-02 23:19:55 +00:00
// DrainDeadlineNotifier allows batch notification of nodes that have reached
// their drain deadline.
2018-03-02 00:37:19 +00:00
type DrainDeadlineNotifier interface {
2018-03-02 23:19:55 +00:00
// NextBatch returns the next batch of nodes that have reached their
// deadline.
NextBatch() <-chan []string
// Remove removes the given node from being tracked for a deadline.
2018-03-02 00:37:19 +00:00
Remove(nodeID string)
2018-03-02 23:19:55 +00:00
// Watch marks the given node for being watched for its deadline.
2018-03-02 00:37:19 +00:00
Watch(nodeID string, deadline time.Time)
}
2018-03-02 23:19:55 +00:00
// deadlineHeap implements the DrainDeadlineNotifier and is backed by a min-heap
// to efficiently determine the next deadlining node. It also supports
// coalescing several deadlines into a single emission.
2018-03-02 00:37:19 +00:00
type deadlineHeap struct {
2018-03-02 23:19:55 +00:00
ctx context.Context
coalesceWindow time.Duration
batch chan []string
nodes map[string]time.Time
2018-03-08 21:25:09 +00:00
trigger chan struct{}
mu sync.Mutex
2018-03-02 23:19:55 +00:00
}
// NewDeadlineHeap returns a new deadline heap that coalesces for the given
// duration and will stop watching when the passed context is cancelled.
func NewDeadlineHeap(ctx context.Context, coalesceWindow time.Duration) *deadlineHeap {
d := &deadlineHeap{
ctx: ctx,
coalesceWindow: coalesceWindow,
2018-03-08 21:25:09 +00:00
batch: make(chan []string),
2018-03-02 23:19:55 +00:00
nodes: make(map[string]time.Time, 64),
trigger: make(chan struct{}, 1),
2018-03-02 23:19:55 +00:00
}
go d.watch()
return d
}
func (d *deadlineHeap) watch() {
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
2018-03-02 23:19:55 +00:00
}
defer timer.Stop()
2018-03-30 16:58:29 +00:00
var nextDeadline time.Time
2018-03-02 23:19:55 +00:00
for {
select {
case <-d.ctx.Done():
return
case <-timer.C:
var batch []string
d.mu.Lock()
2018-03-02 23:19:55 +00:00
for nodeID, nodeDeadline := range d.nodes {
if !nodeDeadline.After(nextDeadline) {
batch = append(batch, nodeID)
2018-03-08 21:25:09 +00:00
delete(d.nodes, nodeID)
2018-03-02 23:19:55 +00:00
}
}
2018-03-08 21:25:09 +00:00
d.mu.Unlock()
2018-03-02 23:19:55 +00:00
if len(batch) > 0 {
// Send the batch
select {
case d.batch <- batch:
case <-d.ctx.Done():
return
}
2018-03-02 23:19:55 +00:00
}
case <-d.trigger:
}
// Calculate the next deadline
2018-03-02 23:19:55 +00:00
deadline, ok := d.calculateNextDeadline()
if !ok {
continue
}
// If the deadline is zero, it is a force drain. Otherwise if the
// deadline is in the future, see if we already have a timer setup to
// handle it. If we don't create the timer.
2018-03-30 16:58:29 +00:00
if deadline.IsZero() || !deadline.Equal(nextDeadline) {
2020-12-09 19:05:18 +00:00
timer.Reset(time.Until(deadline))
2018-03-02 23:19:55 +00:00
nextDeadline = deadline
}
}
}
// calculateNextDeadline returns the next deadline in which to scan for
// deadlined nodes. It applies the coalesce window.
func (d *deadlineHeap) calculateNextDeadline() (time.Time, bool) {
2018-03-08 21:25:09 +00:00
d.mu.Lock()
defer d.mu.Unlock()
2018-03-02 23:19:55 +00:00
if len(d.nodes) == 0 {
return time.Time{}, false
}
// Calculate the new timer value
var deadline time.Time
for _, v := range d.nodes {
if deadline.IsZero() || v.Before(deadline) {
deadline = v
}
}
var maxWithinWindow time.Time
coalescedDeadline := deadline.Add(d.coalesceWindow)
for _, nodeDeadline := range d.nodes {
if nodeDeadline.Before(coalescedDeadline) {
if maxWithinWindow.IsZero() || nodeDeadline.After(maxWithinWindow) {
maxWithinWindow = nodeDeadline
}
}
}
return maxWithinWindow, true
2018-03-02 00:37:19 +00:00
}
2018-03-02 23:19:55 +00:00
// NextBatch returns the next batch of nodes to be drained.
func (d *deadlineHeap) NextBatch() <-chan []string {
return d.batch
}
func (d *deadlineHeap) Remove(nodeID string) {
2018-03-08 21:25:09 +00:00
d.mu.Lock()
defer d.mu.Unlock()
2018-03-02 23:19:55 +00:00
delete(d.nodes, nodeID)
select {
2018-03-08 21:25:09 +00:00
case d.trigger <- struct{}{}:
2018-03-02 23:19:55 +00:00
default:
}
}
func (d *deadlineHeap) Watch(nodeID string, deadline time.Time) {
2018-03-08 21:25:09 +00:00
d.mu.Lock()
defer d.mu.Unlock()
2018-03-02 23:19:55 +00:00
d.nodes[nodeID] = deadline
select {
2018-03-08 21:25:09 +00:00
case d.trigger <- struct{}{}:
2018-03-02 23:19:55 +00:00
default:
}
}