From 34786c71cd6afe7e43f590a62f838674b8949a65 Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Fri, 5 May 2023 15:38:22 +0100 Subject: [PATCH] controller: make the `WorkQueue` generic (#16982) --- agent/consul/controller/controller.go | 16 +++-- agent/consul/controller/controller_test.go | 13 ++-- agent/consul/controller/{ => queue}/defer.go | 68 +++++++++---------- agent/consul/controller/{ => queue}/queue.go | 66 +++++++++--------- agent/consul/controller/{ => queue}/rate.go | 24 +++---- .../controller/{ => queue}/rate_test.go | 10 +-- agent/consul/controller/queue_test.go | 40 +++++------ .../gateways/controller_gateways_test.go | 3 +- 8 files changed, 126 insertions(+), 114 deletions(-) rename agent/consul/controller/{ => queue}/defer.go (74%) rename agent/consul/controller/{ => queue}/queue.go (76%) rename agent/consul/controller/{ => queue}/rate.go (75%) rename agent/consul/controller/{ => queue}/rate_test.go (86%) diff --git a/agent/consul/controller/controller.go b/agent/consul/controller/controller.go index ddd99ea65..f8d6a50c7 100644 --- a/agent/consul/controller/controller.go +++ b/agent/consul/controller/controller.go @@ -11,11 +11,13 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/go-hclog" + "golang.org/x/sync/errgroup" + + "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-hclog" - "golang.org/x/sync/errgroup" ) // much of this is a re-implementation of @@ -53,7 +55,7 @@ type Controller interface { // WithQueueFactory allows a Controller to replace its underlying work queue // implementation. This is most useful for testing. This should only ever be called // prior to running Run. - WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller + WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]) Controller // AddTrigger allows for triggering a reconciliation request when a // triggering function returns, when the passed in context is canceled // the trigger must return @@ -79,11 +81,11 @@ type controller struct { // makeQueue is the factory used for creating the work queue, generally // this shouldn't be touched, but can be updated for testing purposes - makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue + makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request] // workers is the number of workers to use to process data workers int // work is the internal work queue that pending Requests are added to - work WorkQueue + work queue.WorkQueue[Request] // baseBackoff is the starting backoff time for the work queue's rate limiter baseBackoff time.Duration // maxBackoff is the maximum backoff time for the work queue's rate limiter @@ -125,7 +127,7 @@ func New(publisher state.EventPublisher, reconciler Reconciler) Controller { workers: 1, baseBackoff: 5 * time.Millisecond, maxBackoff: 1000 * time.Second, - makeQueue: RunWorkQueue, + makeQueue: queue.RunWorkQueue[Request], started: make(chan struct{}), triggers: make(map[Request]func()), logger: hclog.NewNullLogger(), @@ -179,7 +181,7 @@ func (c *controller) WithLogger(logger hclog.Logger) Controller { // WithQueueFactory changes the initialization method for the Controller's work // queue, this is predominantly just used for testing. This should only ever be called // prior to running Start. -func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller { +func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]) Controller { c.ensureNotRunning() c.makeQueue = fn diff --git a/agent/consul/controller/controller_test.go b/agent/consul/controller/controller_test.go index 1f948c484..97d110222 100644 --- a/agent/consul/controller/controller_test.go +++ b/agent/consul/controller/controller_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" @@ -144,13 +145,13 @@ func TestBasicController_Retry(t *testing.T) { StorageBackend: fsm.NullStorageBackend, }).State() - queueInitialized := make(chan *countingWorkQueue) + queueInitialized := make(chan *countingWorkQueue[Request]) controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{ Topic: state.EventTopicIngressGateway, Subject: stream.SubjectWildcard, }).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond) - go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { - queue := newCountingWorkQueue(RunWorkQueue(ctx, baseBackoff, maxBackoff)) + go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] { + queue := newCountingWorkQueue(queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff)) queueInitialized <- queue return queue }).Run(ctx) @@ -244,9 +245,9 @@ func TestBasicController_RunPanicAssertions(t *testing.T) { started := make(chan struct{}) reconciler := newTestReconciler(false) publisher := stream.NewEventPublisher(0) - controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { + controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] { close(started) - return RunWorkQueue(ctx, baseBackoff, maxBackoff) + return queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff) }) subscription := &stream.SubscribeRequest{ Topic: state.EventTopicIngressGateway, @@ -276,7 +277,7 @@ func TestBasicController_RunPanicAssertions(t *testing.T) { controller.WithWorkers(1) }) require.Panics(t, func() { - controller.WithQueueFactory(RunWorkQueue) + controller.WithQueueFactory(queue.RunWorkQueue[Request]) }) } diff --git a/agent/consul/controller/defer.go b/agent/consul/controller/queue/defer.go similarity index 74% rename from agent/consul/controller/defer.go rename to agent/consul/controller/queue/defer.go index f0c5af5f3..d6e261288 100644 --- a/agent/consul/controller/defer.go +++ b/agent/consul/controller/queue/defer.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -package controller +package queue import ( "container/heap" @@ -14,24 +14,24 @@ import ( // DeferQueue is a generic priority queue implementation that // allows for deferring and later processing Requests. -type DeferQueue interface { +type DeferQueue[T ItemType] interface { // Defer defers processing a Request until a given time. When // the timeout is hit, the request will be processed by the // callback given in the Process loop. If the given context // is canceled, the item is not deferred. - Defer(ctx context.Context, item Request, until time.Time) + Defer(ctx context.Context, item T, until time.Time) // Process processes all items in the defer queue with the // given callback, blocking until the given context is canceled. // Callers should only ever call Process once, likely in a // long-lived goroutine. - Process(ctx context.Context, callback func(item Request)) + Process(ctx context.Context, callback func(item T)) } // deferredRequest is a wrapped Request with information about // when a retry should be attempted -type deferredRequest struct { +type deferredRequest[T ItemType] struct { enqueueAt time.Time - item Request + item T // index holds the index for the given heap entry so that if // the entry is updated the heap can be re-sorted index int @@ -39,24 +39,24 @@ type deferredRequest struct { // deferQueue is a priority queue for deferring Requests for // future processing -type deferQueue struct { - heap *deferHeap - entries map[Request]*deferredRequest +type deferQueue[T ItemType] struct { + heap *deferHeap[T] + entries map[T]*deferredRequest[T] - addChannel chan *deferredRequest + addChannel chan *deferredRequest[T] heartbeat *time.Ticker nextReadyTimer *time.Timer } // NewDeferQueue returns a priority queue for deferred Requests. -func NewDeferQueue(tick time.Duration) DeferQueue { - dHeap := &deferHeap{} +func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] { + dHeap := &deferHeap[T]{} heap.Init(dHeap) - return &deferQueue{ + return &deferQueue[T]{ heap: dHeap, - entries: make(map[Request]*deferredRequest), - addChannel: make(chan *deferredRequest), + entries: make(map[T]*deferredRequest[T]), + addChannel: make(chan *deferredRequest[T]), heartbeat: time.NewTicker(tick), } } @@ -64,8 +64,8 @@ func NewDeferQueue(tick time.Duration) DeferQueue { // Defer defers the given Request until the given time in the future. If the // passed in context is canceled before the Request is deferred, then this // immediately returns. -func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) { - entry := &deferredRequest{ +func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) { + entry := &deferredRequest[T]{ enqueueAt: until, item: item, } @@ -77,7 +77,7 @@ func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) { } // deferEntry adds a deferred request to the priority queue -func (q *deferQueue) deferEntry(entry *deferredRequest) { +func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) { existing, exists := q.entries[entry.item] if exists { // insert or update the item deferral time @@ -95,26 +95,26 @@ func (q *deferQueue) deferEntry(entry *deferredRequest) { // readyRequest returns a pointer to the next ready Request or // nil if no Requests are ready to be processed -func (q *deferQueue) readyRequest() *Request { +func (q *deferQueue[T]) readyRequest() *T { if q.heap.Len() == 0 { return nil } now := time.Now() - entry := q.heap.Peek().(*deferredRequest) + entry := q.heap.Peek().(*deferredRequest[T]) if entry.enqueueAt.After(now) { return nil } - entry = heap.Pop(q.heap).(*deferredRequest) + entry = heap.Pop(q.heap).(*deferredRequest[T]) delete(q.entries, entry.item) return &entry.item } // signalReady returns a timer signal to the next Request // that will be ready on the queue -func (q *deferQueue) signalReady() <-chan time.Time { +func (q *deferQueue[T]) signalReady() <-chan time.Time { if q.heap.Len() == 0 { return make(<-chan time.Time) } @@ -123,7 +123,7 @@ func (q *deferQueue) signalReady() <-chan time.Time { q.nextReadyTimer.Stop() } now := time.Now() - entry := q.heap.Peek().(*deferredRequest) + entry := q.heap.Peek().(*deferredRequest[T]) q.nextReadyTimer = time.NewTimer(entry.enqueueAt.Sub(now)) return q.nextReadyTimer.C } @@ -132,7 +132,7 @@ func (q *deferQueue) signalReady() <-chan time.Time { // given callback, blocking until the given context is canceled. // Callers should only ever call Process once, likely in a // long-lived goroutine. -func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) { +func (q *deferQueue[T]) Process(ctx context.Context, callback func(item T)) { for { ready := q.readyRequest() if ready != nil { @@ -156,7 +156,7 @@ func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) { // continue the loop, which process ready items case entry := <-q.addChannel: - enqueueOrProcess := func(entry *deferredRequest) { + enqueueOrProcess := func(entry *deferredRequest[T]) { now := time.Now() if entry.enqueueAt.After(now) { q.deferEntry(entry) @@ -182,38 +182,38 @@ func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) { } } -var _ heap.Interface = &deferHeap{} +var _ heap.Interface = &deferHeap[string]{} // deferHeap implements heap.Interface -type deferHeap []*deferredRequest +type deferHeap[T ItemType] []*deferredRequest[T] // Len returns the length of the heap. -func (h deferHeap) Len() int { +func (h deferHeap[T]) Len() int { return len(h) } // Less compares heap items for purposes of sorting. -func (h deferHeap) Less(i, j int) bool { +func (h deferHeap[T]) Less(i, j int) bool { return h[i].enqueueAt.Before(h[j].enqueueAt) } // Swap swaps two entries in the heap. -func (h deferHeap) Swap(i, j int) { +func (h deferHeap[T]) Swap(i, j int) { h[i], h[j] = h[j], h[i] h[i].index = i h[j].index = j } // Push pushes an entry onto the heap. -func (h *deferHeap) Push(x interface{}) { +func (h *deferHeap[T]) Push(x interface{}) { n := len(*h) - item := x.(*deferredRequest) + item := x.(*deferredRequest[T]) item.index = n *h = append(*h, item) } // Pop pops an entry off the heap. -func (h *deferHeap) Pop() interface{} { +func (h *deferHeap[T]) Pop() interface{} { n := len(*h) item := (*h)[n-1] item.index = -1 @@ -222,6 +222,6 @@ func (h *deferHeap) Pop() interface{} { } // Peek returns the next item on the heap. -func (h deferHeap) Peek() interface{} { +func (h deferHeap[T]) Peek() interface{} { return h[0] } diff --git a/agent/consul/controller/queue.go b/agent/consul/controller/queue/queue.go similarity index 76% rename from agent/consul/controller/queue.go rename to agent/consul/controller/queue/queue.go index ede7a00c0..fd712b40a 100644 --- a/agent/consul/controller/queue.go +++ b/agent/consul/controller/queue/queue.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -package controller +package queue import ( "context" @@ -12,43 +12,46 @@ import ( // much of this is a re-implementation of // https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go +// ItemType is the type constraint for items in the WorkQueue. +type ItemType comparable + // WorkQueue is an interface for a work queue with semantics to help with // retries and rate limiting. -type WorkQueue interface { +type WorkQueue[T ItemType] interface { // Get retrieves the next Request in the queue, blocking until a Request is // available, if shutdown is true, then the queue is shutting down and should // no longer be used by the caller. - Get() (item Request, shutdown bool) + Get() (item T, shutdown bool) // Add immediately adds a Request to the work queue. - Add(item Request) + Add(item T) // AddAfter adds a Request to the work queue after a given amount of time. - AddAfter(item Request, duration time.Duration) + AddAfter(item T, duration time.Duration) // AddRateLimited adds a Request to the work queue after the amount of time // specified by applying the queue's rate limiter. - AddRateLimited(item Request) + AddRateLimited(item T) // Forget signals the queue to reset the rate-limiting for the given Request. - Forget(item Request) + Forget(item T) // Done tells the work queue that the Request has been successfully processed // and can be deleted from the queue. - Done(item Request) + Done(item T) } // queue implements a rate-limited work queue -type queue struct { +type queue[T ItemType] struct { // queue holds an ordered list of Requests needing to be processed - queue []Request + queue []T // dirty holds the working set of all Requests, whether they are being // processed or not - dirty map[Request]struct{} + dirty map[T]struct{} // processing holds the set of current requests being processed - processing map[Request]struct{} + processing map[T]struct{} // deferred is an internal priority queue that tracks deferred // Requests - deferred DeferQueue + deferred DeferQueue[T] // ratelimiter is the internal rate-limiter for the queue - ratelimiter Limiter + ratelimiter Limiter[T] // cond synchronizes queue access and handles signalling for when // data is available in the queue @@ -58,15 +61,15 @@ type queue struct { ctx context.Context } -// RunWorkQueue returns a started WorkQueue that has per-Request exponential backoff rate-limiting. +// RunWorkQueue returns a started WorkQueue that has per-item exponential backoff rate-limiting. // When the passed in context is canceled, the queue shuts down. -func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { - q := &queue{ - ratelimiter: NewRateLimiter(baseBackoff, maxBackoff), - dirty: make(map[Request]struct{}), - processing: make(map[Request]struct{}), +func RunWorkQueue[T ItemType](ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue[T] { + q := &queue[T]{ + ratelimiter: NewRateLimiter[T](baseBackoff, maxBackoff), + dirty: make(map[T]struct{}), + processing: make(map[T]struct{}), cond: sync.NewCond(&sync.Mutex{}), - deferred: NewDeferQueue(500 * time.Millisecond), + deferred: NewDeferQueue[T](500 * time.Millisecond), ctx: ctx, } go q.start() @@ -75,8 +78,8 @@ func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) Wo } // start begins the asynchronous processing loop for the deferral queue -func (q *queue) start() { - go q.deferred.Process(q.ctx, func(item Request) { +func (q *queue[T]) start() { + go q.deferred.Process(q.ctx, func(item T) { q.Add(item) }) @@ -85,7 +88,7 @@ func (q *queue) start() { } // shuttingDown returns whether the queue is in the process of shutting down -func (q *queue) shuttingDown() bool { +func (q *queue[T]) shuttingDown() bool { select { case <-q.ctx.Done(): return true @@ -98,7 +101,7 @@ func (q *queue) shuttingDown() bool { // an item is available in the queue. If the returned shutdown parameter is true, // then the caller should stop using the queue. Any Requests returned by a call // to Get must be explicitly marked as processed via the Done method. -func (q *queue) Get() (item Request, shutdown bool) { +func (q *queue[T]) Get() (item T, shutdown bool) { q.cond.L.Lock() defer q.cond.L.Unlock() for len(q.queue) == 0 && !q.shuttingDown() { @@ -106,7 +109,8 @@ func (q *queue) Get() (item Request, shutdown bool) { } if len(q.queue) == 0 { // We must be shutting down. - return Request{}, true + var zero T + return zero, true } item, q.queue = q.queue[0], q.queue[1:] @@ -119,7 +123,7 @@ func (q *queue) Get() (item Request, shutdown bool) { // Add puts the given Request in the queue. If the Request is already in // the queue or the queue is stopping, then this is a no-op. -func (q *queue) Add(item Request) { +func (q *queue[T]) Add(item T) { q.cond.L.Lock() defer q.cond.L.Unlock() if q.shuttingDown() { @@ -139,7 +143,7 @@ func (q *queue) Add(item Request) { } // AddAfter adds a Request to the work queue after a given amount of time. -func (q *queue) AddAfter(item Request, duration time.Duration) { +func (q *queue[T]) AddAfter(item T, duration time.Duration) { // don't add if we're already shutting down if q.shuttingDown() { return @@ -156,18 +160,18 @@ func (q *queue) AddAfter(item Request, duration time.Duration) { // AddRateLimited adds the given Request to the queue after applying the // rate limiter to determine when the Request should next be processed. -func (q *queue) AddRateLimited(item Request) { +func (q *queue[T]) AddRateLimited(item T) { q.AddAfter(item, q.ratelimiter.NextRetry(item)) } // Forget signals the queue to reset the rate-limiting for the given Request. -func (q *queue) Forget(item Request) { +func (q *queue[T]) Forget(item T) { q.ratelimiter.Forget(item) } // Done removes the item from the queue, if it has been marked dirty // again while being processed, it is re-added to the queue. -func (q *queue) Done(item Request) { +func (q *queue[T]) Done(item T) { q.cond.L.Lock() defer q.cond.L.Unlock() diff --git a/agent/consul/controller/rate.go b/agent/consul/controller/queue/rate.go similarity index 75% rename from agent/consul/controller/rate.go rename to agent/consul/controller/queue/rate.go index 76bc48f31..f4f0dc5ad 100644 --- a/agent/consul/controller/rate.go +++ b/agent/consul/controller/queue/rate.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -package controller +package queue import ( "math" @@ -14,18 +14,18 @@ import ( // Limiter is an interface for a rate limiter that can limit // the number of retries processed in the work queue. -type Limiter interface { +type Limiter[T ItemType] interface { // NextRetry returns the remaining time until the queue should // reprocess a Request. - NextRetry(request Request) time.Duration + NextRetry(request T) time.Duration // Forget causes the Limiter to reset the backoff for the Request. - Forget(request Request) + Forget(request T) } -var _ Limiter = &ratelimiter{} +var _ Limiter[string] = &ratelimiter[string]{} -type ratelimiter struct { - failures map[Request]int +type ratelimiter[T ItemType] struct { + failures map[T]int base time.Duration max time.Duration mutex sync.RWMutex @@ -33,9 +33,9 @@ type ratelimiter struct { // NewRateLimiter returns a Limiter that does per-item exponential // backoff. -func NewRateLimiter(base, max time.Duration) Limiter { - return &ratelimiter{ - failures: make(map[Request]int), +func NewRateLimiter[T ItemType](base, max time.Duration) Limiter[T] { + return &ratelimiter[T]{ + failures: make(map[T]int), base: base, max: max, } @@ -43,7 +43,7 @@ func NewRateLimiter(base, max time.Duration) Limiter { // NextRetry returns the remaining time until the queue should // reprocess a Request. -func (r *ratelimiter) NextRetry(request Request) time.Duration { +func (r *ratelimiter[T]) NextRetry(request T) time.Duration { r.mutex.RLock() defer r.mutex.RUnlock() @@ -65,7 +65,7 @@ func (r *ratelimiter) NextRetry(request Request) time.Duration { } // Forget causes the Limiter to reset the backoff for the Request. -func (r *ratelimiter) Forget(request Request) { +func (r *ratelimiter[T]) Forget(request T) { r.mutex.Lock() defer r.mutex.Unlock() diff --git a/agent/consul/controller/rate_test.go b/agent/consul/controller/queue/rate_test.go similarity index 86% rename from agent/consul/controller/rate_test.go rename to agent/consul/controller/queue/rate_test.go index 676a28ee6..f44df15e1 100644 --- a/agent/consul/controller/rate_test.go +++ b/agent/consul/controller/queue/rate_test.go @@ -1,7 +1,7 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 -package controller +package queue import ( "testing" @@ -10,10 +10,12 @@ import ( "github.com/stretchr/testify/require" ) +type Request struct{ Kind string } + func TestRateLimiter_Backoff(t *testing.T) { t.Parallel() - limiter := NewRateLimiter(1*time.Millisecond, 1*time.Second) + limiter := NewRateLimiter[Request](1*time.Millisecond, 1*time.Second) request := Request{Kind: "one"} require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request)) @@ -33,7 +35,7 @@ func TestRateLimiter_Backoff(t *testing.T) { func TestRateLimiter_Overflow(t *testing.T) { t.Parallel() - limiter := NewRateLimiter(1*time.Millisecond, 1000*time.Second) + limiter := NewRateLimiter[Request](1*time.Millisecond, 1000*time.Second) request := Request{Kind: "one"} for i := 0; i < 5; i++ { @@ -49,7 +51,7 @@ func TestRateLimiter_Overflow(t *testing.T) { // make sure we're capped at the passed in max backoff require.Equal(t, 1000*time.Second, limiter.NextRetry(overflow)) - limiter = NewRateLimiter(1*time.Minute, 1000*time.Hour) + limiter = NewRateLimiter[Request](1*time.Minute, 1000*time.Hour) for i := 0; i < 2; i++ { limiter.NextRetry(request) diff --git a/agent/consul/controller/queue_test.go b/agent/consul/controller/queue_test.go index 63571511d..867dfeff1 100644 --- a/agent/consul/controller/queue_test.go +++ b/agent/consul/controller/queue_test.go @@ -6,11 +6,13 @@ package controller import ( "sync/atomic" "time" + + "github.com/hashicorp/consul/agent/consul/controller/queue" ) -var _ WorkQueue = &countingWorkQueue{} +var _ queue.WorkQueue[string] = &countingWorkQueue[string]{} -type countingWorkQueue struct { +type countingWorkQueue[T queue.ItemType] struct { getCounter uint64 addCounter uint64 addAfterCounter uint64 @@ -18,16 +20,16 @@ type countingWorkQueue struct { forgetCounter uint64 doneCounter uint64 - inner WorkQueue + inner queue.WorkQueue[T] } -func newCountingWorkQueue(inner WorkQueue) *countingWorkQueue { - return &countingWorkQueue{ +func newCountingWorkQueue[T queue.ItemType](inner queue.WorkQueue[T]) *countingWorkQueue[T] { + return &countingWorkQueue[T]{ inner: inner, } } -func (c *countingWorkQueue) reset() { +func (c *countingWorkQueue[T]) reset() { atomic.StoreUint64(&c.getCounter, 0) atomic.StoreUint64(&c.addCounter, 0) atomic.StoreUint64(&c.addAfterCounter, 0) @@ -36,61 +38,61 @@ func (c *countingWorkQueue) reset() { atomic.StoreUint64(&c.doneCounter, 0) } -func (c *countingWorkQueue) requeues() uint64 { +func (c *countingWorkQueue[T]) requeues() uint64 { return c.addAfters() + c.addRateLimiteds() } -func (c *countingWorkQueue) Get() (item Request, shutdown bool) { +func (c *countingWorkQueue[T]) Get() (item T, shutdown bool) { item, err := c.inner.Get() atomic.AddUint64(&c.getCounter, 1) return item, err } -func (c *countingWorkQueue) gets() uint64 { +func (c *countingWorkQueue[T]) gets() uint64 { return atomic.LoadUint64(&c.getCounter) } -func (c *countingWorkQueue) Add(item Request) { +func (c *countingWorkQueue[T]) Add(item T) { c.inner.Add(item) atomic.AddUint64(&c.addCounter, 1) } -func (c *countingWorkQueue) adds() uint64 { +func (c *countingWorkQueue[T]) adds() uint64 { return atomic.LoadUint64(&c.addCounter) } -func (c *countingWorkQueue) AddAfter(item Request, duration time.Duration) { +func (c *countingWorkQueue[T]) AddAfter(item T, duration time.Duration) { c.inner.AddAfter(item, duration) atomic.AddUint64(&c.addAfterCounter, 1) } -func (c *countingWorkQueue) addAfters() uint64 { +func (c *countingWorkQueue[T]) addAfters() uint64 { return atomic.LoadUint64(&c.addAfterCounter) } -func (c *countingWorkQueue) AddRateLimited(item Request) { +func (c *countingWorkQueue[T]) AddRateLimited(item T) { c.inner.AddRateLimited(item) atomic.AddUint64(&c.addRateLimitedCounter, 1) } -func (c *countingWorkQueue) addRateLimiteds() uint64 { +func (c *countingWorkQueue[T]) addRateLimiteds() uint64 { return atomic.LoadUint64(&c.addRateLimitedCounter) } -func (c *countingWorkQueue) Forget(item Request) { +func (c *countingWorkQueue[T]) Forget(item T) { c.inner.Forget(item) atomic.AddUint64(&c.forgetCounter, 1) } -func (c *countingWorkQueue) forgets() uint64 { +func (c *countingWorkQueue[T]) forgets() uint64 { return atomic.LoadUint64(&c.forgetCounter) } -func (c *countingWorkQueue) Done(item Request) { +func (c *countingWorkQueue[T]) Done(item T) { c.inner.Done(item) atomic.AddUint64(&c.doneCounter, 1) } -func (c *countingWorkQueue) dones() uint64 { +func (c *countingWorkQueue[T]) dones() uint64 { return atomic.LoadUint64(&c.doneCounter) } diff --git a/agent/consul/gateways/controller_gateways_test.go b/agent/consul/gateways/controller_gateways_test.go index d88c7a96b..07d85357a 100644 --- a/agent/consul/gateways/controller_gateways_test.go +++ b/agent/consul/gateways/controller_gateways_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/controller" + "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" @@ -4096,7 +4097,7 @@ func (n *noopController) Subscribe(request *stream.SubscribeRequest, transformer func (n *noopController) WithBackoff(base, max time.Duration) controller.Controller { return n } func (n *noopController) WithLogger(logger hclog.Logger) controller.Controller { return n } func (n *noopController) WithWorkers(i int) controller.Controller { return n } -func (n *noopController) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) controller.WorkQueue) controller.Controller { +func (n *noopController) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[controller.Request]) controller.Controller { return n }