open-consul/agent/consul/controller/defer.go

225 lines
5.4 KiB
Go

package controller
import (
"container/heap"
"context"
"time"
)
// much of this is a re-implementation of
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/delaying_queue.go
// DeferQueue is a generic priority queue implementation that
// allows for deferring and later processing Requests.
type DeferQueue interface {
// Defer defers processing a Request until a given time. When
// the timeout is hit, the request will be processed by the
// callback given in the Process loop. If the given context
// is canceled, the item is not deferred.
Defer(ctx context.Context, item Request, until time.Time)
// Process processes all items in the defer queue with the
// given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a
// long-lived goroutine.
Process(ctx context.Context, callback func(item Request))
}
// deferredRequest is a wrapped Request with information about
// when a retry should be attempted
type deferredRequest struct {
enqueueAt time.Time
item Request
// index holds the index for the given heap entry so that if
// the entry is updated the heap can be re-sorted
index int
}
// deferQueue is a priority queue for deferring Requests for
// future processing
type deferQueue struct {
heap *deferHeap
entries map[Request]*deferredRequest
addChannel chan *deferredRequest
heartbeat *time.Ticker
nextReadyTimer *time.Timer
}
// NewDeferQueue returns a priority queue for deferred Requests.
func NewDeferQueue(tick time.Duration) DeferQueue {
dHeap := &deferHeap{}
heap.Init(dHeap)
return &deferQueue{
heap: dHeap,
entries: make(map[Request]*deferredRequest),
addChannel: make(chan *deferredRequest),
heartbeat: time.NewTicker(tick),
}
}
// Defer defers the given Request until the given time in the future. If the
// passed in context is canceled before the Request is deferred, then this
// immediately returns.
func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) {
entry := &deferredRequest{
enqueueAt: until,
item: item,
}
select {
case <-ctx.Done():
case q.addChannel <- entry:
}
}
// deferEntry adds a deferred request to the priority queue
func (q *deferQueue) deferEntry(entry *deferredRequest) {
existing, exists := q.entries[entry.item]
if exists {
// insert or update the item deferral time
if existing.enqueueAt.After(entry.enqueueAt) {
existing.enqueueAt = entry.enqueueAt
heap.Fix(q.heap, existing.index)
}
return
}
heap.Push(q.heap, entry)
q.entries[entry.item] = entry
}
// readyRequest returns a pointer to the next ready Request or
// nil if no Requests are ready to be processed
func (q *deferQueue) readyRequest() *Request {
if q.heap.Len() == 0 {
return nil
}
now := time.Now()
entry := q.heap.Peek().(*deferredRequest)
if entry.enqueueAt.After(now) {
return nil
}
entry = heap.Pop(q.heap).(*deferredRequest)
delete(q.entries, entry.item)
return &entry.item
}
// signalReady returns a timer signal to the next Request
// that will be ready on the queue
func (q *deferQueue) signalReady() <-chan time.Time {
if q.heap.Len() == 0 {
return make(<-chan time.Time)
}
if q.nextReadyTimer != nil {
q.nextReadyTimer.Stop()
}
now := time.Now()
entry := q.heap.Peek().(*deferredRequest)
q.nextReadyTimer = time.NewTimer(entry.enqueueAt.Sub(now))
return q.nextReadyTimer.C
}
// Process processes all items in the defer queue with the
// given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a
// long-lived goroutine.
func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) {
for {
ready := q.readyRequest()
if ready != nil {
callback(*ready)
}
signalReady := q.signalReady()
select {
case <-ctx.Done():
if q.nextReadyTimer != nil {
q.nextReadyTimer.Stop()
}
q.heartbeat.Stop()
return
case <-q.heartbeat.C:
// continue the loop, which process ready items
case <-signalReady:
// continue the loop, which process ready items
case entry := <-q.addChannel:
enqueueOrProcess := func(entry *deferredRequest) {
now := time.Now()
if entry.enqueueAt.After(now) {
q.deferEntry(entry)
} else {
// fast-path, process immediately if we don't need to defer
callback(entry.item)
}
}
enqueueOrProcess(entry)
// drain the add channel before we do anything else
drained := false
for !drained {
select {
case entry := <-q.addChannel:
enqueueOrProcess(entry)
default:
drained = true
}
}
}
}
}
var _ heap.Interface = &deferHeap{}
// deferHeap implements heap.Interface
type deferHeap []*deferredRequest
// Len returns the length of the heap.
func (h deferHeap) Len() int {
return len(h)
}
// Less compares heap items for purposes of sorting.
func (h deferHeap) Less(i, j int) bool {
return h[i].enqueueAt.Before(h[j].enqueueAt)
}
// Swap swaps two entries in the heap.
func (h deferHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
// Push pushes an entry onto the heap.
func (h *deferHeap) Push(x interface{}) {
n := len(*h)
item := x.(*deferredRequest)
item.index = n
*h = append(*h, item)
}
// Pop pops an entry off the heap.
func (h *deferHeap) Pop() interface{} {
n := len(*h)
item := (*h)[n-1]
item.index = -1
*h = (*h)[0:(n - 1)]
return item
}
// Peek returns the next item on the heap.
func (h deferHeap) Peek() interface{} {
return h[0]
}