controller: make the `WorkQueue` generic (#16982)
This commit is contained in:
parent
36c7edf13c
commit
34786c71cd
|
@ -11,11 +11,13 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/controller/queue"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// much of this is a re-implementation of
|
||||
|
@ -53,7 +55,7 @@ type Controller interface {
|
|||
// WithQueueFactory allows a Controller to replace its underlying work queue
|
||||
// implementation. This is most useful for testing. This should only ever be called
|
||||
// prior to running Run.
|
||||
WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller
|
||||
WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]) Controller
|
||||
// AddTrigger allows for triggering a reconciliation request when a
|
||||
// triggering function returns, when the passed in context is canceled
|
||||
// the trigger must return
|
||||
|
@ -79,11 +81,11 @@ type controller struct {
|
|||
|
||||
// makeQueue is the factory used for creating the work queue, generally
|
||||
// this shouldn't be touched, but can be updated for testing purposes
|
||||
makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue
|
||||
makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]
|
||||
// workers is the number of workers to use to process data
|
||||
workers int
|
||||
// work is the internal work queue that pending Requests are added to
|
||||
work WorkQueue
|
||||
work queue.WorkQueue[Request]
|
||||
// baseBackoff is the starting backoff time for the work queue's rate limiter
|
||||
baseBackoff time.Duration
|
||||
// maxBackoff is the maximum backoff time for the work queue's rate limiter
|
||||
|
@ -125,7 +127,7 @@ func New(publisher state.EventPublisher, reconciler Reconciler) Controller {
|
|||
workers: 1,
|
||||
baseBackoff: 5 * time.Millisecond,
|
||||
maxBackoff: 1000 * time.Second,
|
||||
makeQueue: RunWorkQueue,
|
||||
makeQueue: queue.RunWorkQueue[Request],
|
||||
started: make(chan struct{}),
|
||||
triggers: make(map[Request]func()),
|
||||
logger: hclog.NewNullLogger(),
|
||||
|
@ -179,7 +181,7 @@ func (c *controller) WithLogger(logger hclog.Logger) Controller {
|
|||
// WithQueueFactory changes the initialization method for the Controller's work
|
||||
// queue, this is predominantly just used for testing. This should only ever be called
|
||||
// prior to running Start.
|
||||
func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller {
|
||||
func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]) Controller {
|
||||
c.ensureNotRunning()
|
||||
|
||||
c.makeQueue = fn
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/controller/queue"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
|
@ -144,13 +145,13 @@ func TestBasicController_Retry(t *testing.T) {
|
|||
StorageBackend: fsm.NullStorageBackend,
|
||||
}).State()
|
||||
|
||||
queueInitialized := make(chan *countingWorkQueue)
|
||||
queueInitialized := make(chan *countingWorkQueue[Request])
|
||||
controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
|
||||
Topic: state.EventTopicIngressGateway,
|
||||
Subject: stream.SubjectWildcard,
|
||||
}).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond)
|
||||
go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
|
||||
queue := newCountingWorkQueue(RunWorkQueue(ctx, baseBackoff, maxBackoff))
|
||||
go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] {
|
||||
queue := newCountingWorkQueue(queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff))
|
||||
queueInitialized <- queue
|
||||
return queue
|
||||
}).Run(ctx)
|
||||
|
@ -244,9 +245,9 @@ func TestBasicController_RunPanicAssertions(t *testing.T) {
|
|||
started := make(chan struct{})
|
||||
reconciler := newTestReconciler(false)
|
||||
publisher := stream.NewEventPublisher(0)
|
||||
controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
|
||||
controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] {
|
||||
close(started)
|
||||
return RunWorkQueue(ctx, baseBackoff, maxBackoff)
|
||||
return queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff)
|
||||
})
|
||||
subscription := &stream.SubscribeRequest{
|
||||
Topic: state.EventTopicIngressGateway,
|
||||
|
@ -276,7 +277,7 @@ func TestBasicController_RunPanicAssertions(t *testing.T) {
|
|||
controller.WithWorkers(1)
|
||||
})
|
||||
require.Panics(t, func() {
|
||||
controller.WithQueueFactory(RunWorkQueue)
|
||||
controller.WithQueueFactory(queue.RunWorkQueue[Request])
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package controller
|
||||
package queue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
|
@ -14,24 +14,24 @@ import (
|
|||
|
||||
// DeferQueue is a generic priority queue implementation that
|
||||
// allows for deferring and later processing Requests.
|
||||
type DeferQueue interface {
|
||||
type DeferQueue[T ItemType] interface {
|
||||
// Defer defers processing a Request until a given time. When
|
||||
// the timeout is hit, the request will be processed by the
|
||||
// callback given in the Process loop. If the given context
|
||||
// is canceled, the item is not deferred.
|
||||
Defer(ctx context.Context, item Request, until time.Time)
|
||||
Defer(ctx context.Context, item T, until time.Time)
|
||||
// Process processes all items in the defer queue with the
|
||||
// given callback, blocking until the given context is canceled.
|
||||
// Callers should only ever call Process once, likely in a
|
||||
// long-lived goroutine.
|
||||
Process(ctx context.Context, callback func(item Request))
|
||||
Process(ctx context.Context, callback func(item T))
|
||||
}
|
||||
|
||||
// deferredRequest is a wrapped Request with information about
|
||||
// when a retry should be attempted
|
||||
type deferredRequest struct {
|
||||
type deferredRequest[T ItemType] struct {
|
||||
enqueueAt time.Time
|
||||
item Request
|
||||
item T
|
||||
// index holds the index for the given heap entry so that if
|
||||
// the entry is updated the heap can be re-sorted
|
||||
index int
|
||||
|
@ -39,24 +39,24 @@ type deferredRequest struct {
|
|||
|
||||
// deferQueue is a priority queue for deferring Requests for
|
||||
// future processing
|
||||
type deferQueue struct {
|
||||
heap *deferHeap
|
||||
entries map[Request]*deferredRequest
|
||||
type deferQueue[T ItemType] struct {
|
||||
heap *deferHeap[T]
|
||||
entries map[T]*deferredRequest[T]
|
||||
|
||||
addChannel chan *deferredRequest
|
||||
addChannel chan *deferredRequest[T]
|
||||
heartbeat *time.Ticker
|
||||
nextReadyTimer *time.Timer
|
||||
}
|
||||
|
||||
// NewDeferQueue returns a priority queue for deferred Requests.
|
||||
func NewDeferQueue(tick time.Duration) DeferQueue {
|
||||
dHeap := &deferHeap{}
|
||||
func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] {
|
||||
dHeap := &deferHeap[T]{}
|
||||
heap.Init(dHeap)
|
||||
|
||||
return &deferQueue{
|
||||
return &deferQueue[T]{
|
||||
heap: dHeap,
|
||||
entries: make(map[Request]*deferredRequest),
|
||||
addChannel: make(chan *deferredRequest),
|
||||
entries: make(map[T]*deferredRequest[T]),
|
||||
addChannel: make(chan *deferredRequest[T]),
|
||||
heartbeat: time.NewTicker(tick),
|
||||
}
|
||||
}
|
||||
|
@ -64,8 +64,8 @@ func NewDeferQueue(tick time.Duration) DeferQueue {
|
|||
// Defer defers the given Request until the given time in the future. If the
|
||||
// passed in context is canceled before the Request is deferred, then this
|
||||
// immediately returns.
|
||||
func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) {
|
||||
entry := &deferredRequest{
|
||||
func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) {
|
||||
entry := &deferredRequest[T]{
|
||||
enqueueAt: until,
|
||||
item: item,
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) {
|
|||
}
|
||||
|
||||
// deferEntry adds a deferred request to the priority queue
|
||||
func (q *deferQueue) deferEntry(entry *deferredRequest) {
|
||||
func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) {
|
||||
existing, exists := q.entries[entry.item]
|
||||
if exists {
|
||||
// insert or update the item deferral time
|
||||
|
@ -95,26 +95,26 @@ func (q *deferQueue) deferEntry(entry *deferredRequest) {
|
|||
|
||||
// readyRequest returns a pointer to the next ready Request or
|
||||
// nil if no Requests are ready to be processed
|
||||
func (q *deferQueue) readyRequest() *Request {
|
||||
func (q *deferQueue[T]) readyRequest() *T {
|
||||
if q.heap.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
entry := q.heap.Peek().(*deferredRequest)
|
||||
entry := q.heap.Peek().(*deferredRequest[T])
|
||||
if entry.enqueueAt.After(now) {
|
||||
return nil
|
||||
}
|
||||
|
||||
entry = heap.Pop(q.heap).(*deferredRequest)
|
||||
entry = heap.Pop(q.heap).(*deferredRequest[T])
|
||||
delete(q.entries, entry.item)
|
||||
return &entry.item
|
||||
}
|
||||
|
||||
// signalReady returns a timer signal to the next Request
|
||||
// that will be ready on the queue
|
||||
func (q *deferQueue) signalReady() <-chan time.Time {
|
||||
func (q *deferQueue[T]) signalReady() <-chan time.Time {
|
||||
if q.heap.Len() == 0 {
|
||||
return make(<-chan time.Time)
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ func (q *deferQueue) signalReady() <-chan time.Time {
|
|||
q.nextReadyTimer.Stop()
|
||||
}
|
||||
now := time.Now()
|
||||
entry := q.heap.Peek().(*deferredRequest)
|
||||
entry := q.heap.Peek().(*deferredRequest[T])
|
||||
q.nextReadyTimer = time.NewTimer(entry.enqueueAt.Sub(now))
|
||||
return q.nextReadyTimer.C
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ func (q *deferQueue) signalReady() <-chan time.Time {
|
|||
// given callback, blocking until the given context is canceled.
|
||||
// Callers should only ever call Process once, likely in a
|
||||
// long-lived goroutine.
|
||||
func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) {
|
||||
func (q *deferQueue[T]) Process(ctx context.Context, callback func(item T)) {
|
||||
for {
|
||||
ready := q.readyRequest()
|
||||
if ready != nil {
|
||||
|
@ -156,7 +156,7 @@ func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) {
|
|||
// continue the loop, which process ready items
|
||||
|
||||
case entry := <-q.addChannel:
|
||||
enqueueOrProcess := func(entry *deferredRequest) {
|
||||
enqueueOrProcess := func(entry *deferredRequest[T]) {
|
||||
now := time.Now()
|
||||
if entry.enqueueAt.After(now) {
|
||||
q.deferEntry(entry)
|
||||
|
@ -182,38 +182,38 @@ func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) {
|
|||
}
|
||||
}
|
||||
|
||||
var _ heap.Interface = &deferHeap{}
|
||||
var _ heap.Interface = &deferHeap[string]{}
|
||||
|
||||
// deferHeap implements heap.Interface
|
||||
type deferHeap []*deferredRequest
|
||||
type deferHeap[T ItemType] []*deferredRequest[T]
|
||||
|
||||
// Len returns the length of the heap.
|
||||
func (h deferHeap) Len() int {
|
||||
func (h deferHeap[T]) Len() int {
|
||||
return len(h)
|
||||
}
|
||||
|
||||
// Less compares heap items for purposes of sorting.
|
||||
func (h deferHeap) Less(i, j int) bool {
|
||||
func (h deferHeap[T]) Less(i, j int) bool {
|
||||
return h[i].enqueueAt.Before(h[j].enqueueAt)
|
||||
}
|
||||
|
||||
// Swap swaps two entries in the heap.
|
||||
func (h deferHeap) Swap(i, j int) {
|
||||
func (h deferHeap[T]) Swap(i, j int) {
|
||||
h[i], h[j] = h[j], h[i]
|
||||
h[i].index = i
|
||||
h[j].index = j
|
||||
}
|
||||
|
||||
// Push pushes an entry onto the heap.
|
||||
func (h *deferHeap) Push(x interface{}) {
|
||||
func (h *deferHeap[T]) Push(x interface{}) {
|
||||
n := len(*h)
|
||||
item := x.(*deferredRequest)
|
||||
item := x.(*deferredRequest[T])
|
||||
item.index = n
|
||||
*h = append(*h, item)
|
||||
}
|
||||
|
||||
// Pop pops an entry off the heap.
|
||||
func (h *deferHeap) Pop() interface{} {
|
||||
func (h *deferHeap[T]) Pop() interface{} {
|
||||
n := len(*h)
|
||||
item := (*h)[n-1]
|
||||
item.index = -1
|
||||
|
@ -222,6 +222,6 @@ func (h *deferHeap) Pop() interface{} {
|
|||
}
|
||||
|
||||
// Peek returns the next item on the heap.
|
||||
func (h deferHeap) Peek() interface{} {
|
||||
func (h deferHeap[T]) Peek() interface{} {
|
||||
return h[0]
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package controller
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -12,43 +12,46 @@ import (
|
|||
// much of this is a re-implementation of
|
||||
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go
|
||||
|
||||
// ItemType is the type constraint for items in the WorkQueue.
|
||||
type ItemType comparable
|
||||
|
||||
// WorkQueue is an interface for a work queue with semantics to help with
|
||||
// retries and rate limiting.
|
||||
type WorkQueue interface {
|
||||
type WorkQueue[T ItemType] interface {
|
||||
// Get retrieves the next Request in the queue, blocking until a Request is
|
||||
// available, if shutdown is true, then the queue is shutting down and should
|
||||
// no longer be used by the caller.
|
||||
Get() (item Request, shutdown bool)
|
||||
Get() (item T, shutdown bool)
|
||||
// Add immediately adds a Request to the work queue.
|
||||
Add(item Request)
|
||||
Add(item T)
|
||||
// AddAfter adds a Request to the work queue after a given amount of time.
|
||||
AddAfter(item Request, duration time.Duration)
|
||||
AddAfter(item T, duration time.Duration)
|
||||
// AddRateLimited adds a Request to the work queue after the amount of time
|
||||
// specified by applying the queue's rate limiter.
|
||||
AddRateLimited(item Request)
|
||||
AddRateLimited(item T)
|
||||
// Forget signals the queue to reset the rate-limiting for the given Request.
|
||||
Forget(item Request)
|
||||
Forget(item T)
|
||||
// Done tells the work queue that the Request has been successfully processed
|
||||
// and can be deleted from the queue.
|
||||
Done(item Request)
|
||||
Done(item T)
|
||||
}
|
||||
|
||||
// queue implements a rate-limited work queue
|
||||
type queue struct {
|
||||
type queue[T ItemType] struct {
|
||||
// queue holds an ordered list of Requests needing to be processed
|
||||
queue []Request
|
||||
queue []T
|
||||
|
||||
// dirty holds the working set of all Requests, whether they are being
|
||||
// processed or not
|
||||
dirty map[Request]struct{}
|
||||
dirty map[T]struct{}
|
||||
// processing holds the set of current requests being processed
|
||||
processing map[Request]struct{}
|
||||
processing map[T]struct{}
|
||||
|
||||
// deferred is an internal priority queue that tracks deferred
|
||||
// Requests
|
||||
deferred DeferQueue
|
||||
deferred DeferQueue[T]
|
||||
// ratelimiter is the internal rate-limiter for the queue
|
||||
ratelimiter Limiter
|
||||
ratelimiter Limiter[T]
|
||||
|
||||
// cond synchronizes queue access and handles signalling for when
|
||||
// data is available in the queue
|
||||
|
@ -58,15 +61,15 @@ type queue struct {
|
|||
ctx context.Context
|
||||
}
|
||||
|
||||
// RunWorkQueue returns a started WorkQueue that has per-Request exponential backoff rate-limiting.
|
||||
// RunWorkQueue returns a started WorkQueue that has per-item exponential backoff rate-limiting.
|
||||
// When the passed in context is canceled, the queue shuts down.
|
||||
func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
|
||||
q := &queue{
|
||||
ratelimiter: NewRateLimiter(baseBackoff, maxBackoff),
|
||||
dirty: make(map[Request]struct{}),
|
||||
processing: make(map[Request]struct{}),
|
||||
func RunWorkQueue[T ItemType](ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue[T] {
|
||||
q := &queue[T]{
|
||||
ratelimiter: NewRateLimiter[T](baseBackoff, maxBackoff),
|
||||
dirty: make(map[T]struct{}),
|
||||
processing: make(map[T]struct{}),
|
||||
cond: sync.NewCond(&sync.Mutex{}),
|
||||
deferred: NewDeferQueue(500 * time.Millisecond),
|
||||
deferred: NewDeferQueue[T](500 * time.Millisecond),
|
||||
ctx: ctx,
|
||||
}
|
||||
go q.start()
|
||||
|
@ -75,8 +78,8 @@ func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) Wo
|
|||
}
|
||||
|
||||
// start begins the asynchronous processing loop for the deferral queue
|
||||
func (q *queue) start() {
|
||||
go q.deferred.Process(q.ctx, func(item Request) {
|
||||
func (q *queue[T]) start() {
|
||||
go q.deferred.Process(q.ctx, func(item T) {
|
||||
q.Add(item)
|
||||
})
|
||||
|
||||
|
@ -85,7 +88,7 @@ func (q *queue) start() {
|
|||
}
|
||||
|
||||
// shuttingDown returns whether the queue is in the process of shutting down
|
||||
func (q *queue) shuttingDown() bool {
|
||||
func (q *queue[T]) shuttingDown() bool {
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return true
|
||||
|
@ -98,7 +101,7 @@ func (q *queue) shuttingDown() bool {
|
|||
// an item is available in the queue. If the returned shutdown parameter is true,
|
||||
// then the caller should stop using the queue. Any Requests returned by a call
|
||||
// to Get must be explicitly marked as processed via the Done method.
|
||||
func (q *queue) Get() (item Request, shutdown bool) {
|
||||
func (q *queue[T]) Get() (item T, shutdown bool) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
for len(q.queue) == 0 && !q.shuttingDown() {
|
||||
|
@ -106,7 +109,8 @@ func (q *queue) Get() (item Request, shutdown bool) {
|
|||
}
|
||||
if len(q.queue) == 0 {
|
||||
// We must be shutting down.
|
||||
return Request{}, true
|
||||
var zero T
|
||||
return zero, true
|
||||
}
|
||||
|
||||
item, q.queue = q.queue[0], q.queue[1:]
|
||||
|
@ -119,7 +123,7 @@ func (q *queue) Get() (item Request, shutdown bool) {
|
|||
|
||||
// Add puts the given Request in the queue. If the Request is already in
|
||||
// the queue or the queue is stopping, then this is a no-op.
|
||||
func (q *queue) Add(item Request) {
|
||||
func (q *queue[T]) Add(item T) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
if q.shuttingDown() {
|
||||
|
@ -139,7 +143,7 @@ func (q *queue) Add(item Request) {
|
|||
}
|
||||
|
||||
// AddAfter adds a Request to the work queue after a given amount of time.
|
||||
func (q *queue) AddAfter(item Request, duration time.Duration) {
|
||||
func (q *queue[T]) AddAfter(item T, duration time.Duration) {
|
||||
// don't add if we're already shutting down
|
||||
if q.shuttingDown() {
|
||||
return
|
||||
|
@ -156,18 +160,18 @@ func (q *queue) AddAfter(item Request, duration time.Duration) {
|
|||
|
||||
// AddRateLimited adds the given Request to the queue after applying the
|
||||
// rate limiter to determine when the Request should next be processed.
|
||||
func (q *queue) AddRateLimited(item Request) {
|
||||
func (q *queue[T]) AddRateLimited(item T) {
|
||||
q.AddAfter(item, q.ratelimiter.NextRetry(item))
|
||||
}
|
||||
|
||||
// Forget signals the queue to reset the rate-limiting for the given Request.
|
||||
func (q *queue) Forget(item Request) {
|
||||
func (q *queue[T]) Forget(item T) {
|
||||
q.ratelimiter.Forget(item)
|
||||
}
|
||||
|
||||
// Done removes the item from the queue, if it has been marked dirty
|
||||
// again while being processed, it is re-added to the queue.
|
||||
func (q *queue) Done(item Request) {
|
||||
func (q *queue[T]) Done(item T) {
|
||||
q.cond.L.Lock()
|
||||
defer q.cond.L.Unlock()
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package controller
|
||||
package queue
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
@ -14,18 +14,18 @@ import (
|
|||
|
||||
// Limiter is an interface for a rate limiter that can limit
|
||||
// the number of retries processed in the work queue.
|
||||
type Limiter interface {
|
||||
type Limiter[T ItemType] interface {
|
||||
// NextRetry returns the remaining time until the queue should
|
||||
// reprocess a Request.
|
||||
NextRetry(request Request) time.Duration
|
||||
NextRetry(request T) time.Duration
|
||||
// Forget causes the Limiter to reset the backoff for the Request.
|
||||
Forget(request Request)
|
||||
Forget(request T)
|
||||
}
|
||||
|
||||
var _ Limiter = &ratelimiter{}
|
||||
var _ Limiter[string] = &ratelimiter[string]{}
|
||||
|
||||
type ratelimiter struct {
|
||||
failures map[Request]int
|
||||
type ratelimiter[T ItemType] struct {
|
||||
failures map[T]int
|
||||
base time.Duration
|
||||
max time.Duration
|
||||
mutex sync.RWMutex
|
||||
|
@ -33,9 +33,9 @@ type ratelimiter struct {
|
|||
|
||||
// NewRateLimiter returns a Limiter that does per-item exponential
|
||||
// backoff.
|
||||
func NewRateLimiter(base, max time.Duration) Limiter {
|
||||
return &ratelimiter{
|
||||
failures: make(map[Request]int),
|
||||
func NewRateLimiter[T ItemType](base, max time.Duration) Limiter[T] {
|
||||
return &ratelimiter[T]{
|
||||
failures: make(map[T]int),
|
||||
base: base,
|
||||
max: max,
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func NewRateLimiter(base, max time.Duration) Limiter {
|
|||
|
||||
// NextRetry returns the remaining time until the queue should
|
||||
// reprocess a Request.
|
||||
func (r *ratelimiter) NextRetry(request Request) time.Duration {
|
||||
func (r *ratelimiter[T]) NextRetry(request T) time.Duration {
|
||||
r.mutex.RLock()
|
||||
defer r.mutex.RUnlock()
|
||||
|
||||
|
@ -65,7 +65,7 @@ func (r *ratelimiter) NextRetry(request Request) time.Duration {
|
|||
}
|
||||
|
||||
// Forget causes the Limiter to reset the backoff for the Request.
|
||||
func (r *ratelimiter) Forget(request Request) {
|
||||
func (r *ratelimiter[T]) Forget(request T) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package controller
|
||||
package queue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
@ -10,10 +10,12 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type Request struct{ Kind string }
|
||||
|
||||
func TestRateLimiter_Backoff(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
limiter := NewRateLimiter(1*time.Millisecond, 1*time.Second)
|
||||
limiter := NewRateLimiter[Request](1*time.Millisecond, 1*time.Second)
|
||||
|
||||
request := Request{Kind: "one"}
|
||||
require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request))
|
||||
|
@ -33,7 +35,7 @@ func TestRateLimiter_Backoff(t *testing.T) {
|
|||
func TestRateLimiter_Overflow(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
limiter := NewRateLimiter(1*time.Millisecond, 1000*time.Second)
|
||||
limiter := NewRateLimiter[Request](1*time.Millisecond, 1000*time.Second)
|
||||
|
||||
request := Request{Kind: "one"}
|
||||
for i := 0; i < 5; i++ {
|
||||
|
@ -49,7 +51,7 @@ func TestRateLimiter_Overflow(t *testing.T) {
|
|||
// make sure we're capped at the passed in max backoff
|
||||
require.Equal(t, 1000*time.Second, limiter.NextRetry(overflow))
|
||||
|
||||
limiter = NewRateLimiter(1*time.Minute, 1000*time.Hour)
|
||||
limiter = NewRateLimiter[Request](1*time.Minute, 1000*time.Hour)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
limiter.NextRetry(request)
|
|
@ -6,11 +6,13 @@ package controller
|
|||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/controller/queue"
|
||||
)
|
||||
|
||||
var _ WorkQueue = &countingWorkQueue{}
|
||||
var _ queue.WorkQueue[string] = &countingWorkQueue[string]{}
|
||||
|
||||
type countingWorkQueue struct {
|
||||
type countingWorkQueue[T queue.ItemType] struct {
|
||||
getCounter uint64
|
||||
addCounter uint64
|
||||
addAfterCounter uint64
|
||||
|
@ -18,16 +20,16 @@ type countingWorkQueue struct {
|
|||
forgetCounter uint64
|
||||
doneCounter uint64
|
||||
|
||||
inner WorkQueue
|
||||
inner queue.WorkQueue[T]
|
||||
}
|
||||
|
||||
func newCountingWorkQueue(inner WorkQueue) *countingWorkQueue {
|
||||
return &countingWorkQueue{
|
||||
func newCountingWorkQueue[T queue.ItemType](inner queue.WorkQueue[T]) *countingWorkQueue[T] {
|
||||
return &countingWorkQueue[T]{
|
||||
inner: inner,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) reset() {
|
||||
func (c *countingWorkQueue[T]) reset() {
|
||||
atomic.StoreUint64(&c.getCounter, 0)
|
||||
atomic.StoreUint64(&c.addCounter, 0)
|
||||
atomic.StoreUint64(&c.addAfterCounter, 0)
|
||||
|
@ -36,61 +38,61 @@ func (c *countingWorkQueue) reset() {
|
|||
atomic.StoreUint64(&c.doneCounter, 0)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) requeues() uint64 {
|
||||
func (c *countingWorkQueue[T]) requeues() uint64 {
|
||||
return c.addAfters() + c.addRateLimiteds()
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) Get() (item Request, shutdown bool) {
|
||||
func (c *countingWorkQueue[T]) Get() (item T, shutdown bool) {
|
||||
item, err := c.inner.Get()
|
||||
atomic.AddUint64(&c.getCounter, 1)
|
||||
return item, err
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) gets() uint64 {
|
||||
func (c *countingWorkQueue[T]) gets() uint64 {
|
||||
return atomic.LoadUint64(&c.getCounter)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) Add(item Request) {
|
||||
func (c *countingWorkQueue[T]) Add(item T) {
|
||||
c.inner.Add(item)
|
||||
atomic.AddUint64(&c.addCounter, 1)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) adds() uint64 {
|
||||
func (c *countingWorkQueue[T]) adds() uint64 {
|
||||
return atomic.LoadUint64(&c.addCounter)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) AddAfter(item Request, duration time.Duration) {
|
||||
func (c *countingWorkQueue[T]) AddAfter(item T, duration time.Duration) {
|
||||
c.inner.AddAfter(item, duration)
|
||||
atomic.AddUint64(&c.addAfterCounter, 1)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) addAfters() uint64 {
|
||||
func (c *countingWorkQueue[T]) addAfters() uint64 {
|
||||
return atomic.LoadUint64(&c.addAfterCounter)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) AddRateLimited(item Request) {
|
||||
func (c *countingWorkQueue[T]) AddRateLimited(item T) {
|
||||
c.inner.AddRateLimited(item)
|
||||
atomic.AddUint64(&c.addRateLimitedCounter, 1)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) addRateLimiteds() uint64 {
|
||||
func (c *countingWorkQueue[T]) addRateLimiteds() uint64 {
|
||||
return atomic.LoadUint64(&c.addRateLimitedCounter)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) Forget(item Request) {
|
||||
func (c *countingWorkQueue[T]) Forget(item T) {
|
||||
c.inner.Forget(item)
|
||||
atomic.AddUint64(&c.forgetCounter, 1)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) forgets() uint64 {
|
||||
func (c *countingWorkQueue[T]) forgets() uint64 {
|
||||
return atomic.LoadUint64(&c.forgetCounter)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) Done(item Request) {
|
||||
func (c *countingWorkQueue[T]) Done(item T) {
|
||||
c.inner.Done(item)
|
||||
atomic.AddUint64(&c.doneCounter, 1)
|
||||
}
|
||||
|
||||
func (c *countingWorkQueue) dones() uint64 {
|
||||
func (c *countingWorkQueue[T]) dones() uint64 {
|
||||
return atomic.LoadUint64(&c.doneCounter)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/controller"
|
||||
"github.com/hashicorp/consul/agent/consul/controller/queue"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
|
@ -4096,7 +4097,7 @@ func (n *noopController) Subscribe(request *stream.SubscribeRequest, transformer
|
|||
func (n *noopController) WithBackoff(base, max time.Duration) controller.Controller { return n }
|
||||
func (n *noopController) WithLogger(logger hclog.Logger) controller.Controller { return n }
|
||||
func (n *noopController) WithWorkers(i int) controller.Controller { return n }
|
||||
func (n *noopController) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) controller.WorkQueue) controller.Controller {
|
||||
func (n *noopController) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[controller.Request]) controller.Controller {
|
||||
return n
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue