diff --git a/agent/consul/controller/queue/defer.go b/agent/consul/controller/queue/defer.go index d6e261288..01666219c 100644 --- a/agent/consul/controller/queue/defer.go +++ b/agent/consul/controller/queue/defer.go @@ -41,7 +41,7 @@ type deferredRequest[T ItemType] struct { // future processing type deferQueue[T ItemType] struct { heap *deferHeap[T] - entries map[T]*deferredRequest[T] + entries map[string]*deferredRequest[T] addChannel chan *deferredRequest[T] heartbeat *time.Ticker @@ -55,7 +55,7 @@ func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] { return &deferQueue[T]{ heap: dHeap, - entries: make(map[T]*deferredRequest[T]), + entries: make(map[string]*deferredRequest[T]), addChannel: make(chan *deferredRequest[T]), heartbeat: time.NewTicker(tick), } @@ -78,7 +78,7 @@ func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) { // deferEntry adds a deferred request to the priority queue func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) { - existing, exists := q.entries[entry.item] + existing, exists := q.entries[entry.item.Key()] if exists { // insert or update the item deferral time if existing.enqueueAt.After(entry.enqueueAt) { @@ -90,7 +90,7 @@ func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) { } heap.Push(q.heap, entry) - q.entries[entry.item] = entry + q.entries[entry.item.Key()] = entry } // readyRequest returns a pointer to the next ready Request or @@ -108,7 +108,7 @@ func (q *deferQueue[T]) readyRequest() *T { } entry = heap.Pop(q.heap).(*deferredRequest[T]) - delete(q.entries, entry.item) + delete(q.entries, entry.item.Key()) return &entry.item } @@ -182,8 +182,6 @@ func (q *deferQueue[T]) Process(ctx context.Context, callback func(item T)) { } } -var _ heap.Interface = &deferHeap[string]{} - // deferHeap implements heap.Interface type deferHeap[T ItemType] []*deferredRequest[T] diff --git a/agent/consul/controller/queue/queue.go b/agent/consul/controller/queue/queue.go index fd712b40a..6d9f0a657 100644 --- a/agent/consul/controller/queue/queue.go +++ b/agent/consul/controller/queue/queue.go @@ -13,7 +13,10 @@ import ( // https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go // ItemType is the type constraint for items in the WorkQueue. -type ItemType comparable +type ItemType interface { + // Key returns a string that will be used to de-duplicate items in the queue. + Key() string +} // WorkQueue is an interface for a work queue with semantics to help with // retries and rate limiting. @@ -43,9 +46,9 @@ type queue[T ItemType] struct { // dirty holds the working set of all Requests, whether they are being // processed or not - dirty map[T]struct{} + dirty map[string]struct{} // processing holds the set of current requests being processed - processing map[T]struct{} + processing map[string]struct{} // deferred is an internal priority queue that tracks deferred // Requests @@ -66,8 +69,8 @@ type queue[T ItemType] struct { func RunWorkQueue[T ItemType](ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue[T] { q := &queue[T]{ ratelimiter: NewRateLimiter[T](baseBackoff, maxBackoff), - dirty: make(map[T]struct{}), - processing: make(map[T]struct{}), + dirty: make(map[string]struct{}), + processing: make(map[string]struct{}), cond: sync.NewCond(&sync.Mutex{}), deferred: NewDeferQueue[T](500 * time.Millisecond), ctx: ctx, @@ -115,8 +118,8 @@ func (q *queue[T]) Get() (item T, shutdown bool) { item, q.queue = q.queue[0], q.queue[1:] - q.processing[item] = struct{}{} - delete(q.dirty, item) + q.processing[item.Key()] = struct{}{} + delete(q.dirty, item.Key()) return item, false } @@ -129,12 +132,12 @@ func (q *queue[T]) Add(item T) { if q.shuttingDown() { return } - if _, ok := q.dirty[item]; ok { + if _, ok := q.dirty[item.Key()]; ok { return } - q.dirty[item] = struct{}{} - if _, ok := q.processing[item]; ok { + q.dirty[item.Key()] = struct{}{} + if _, ok := q.processing[item.Key()]; ok { return } @@ -175,8 +178,8 @@ func (q *queue[T]) Done(item T) { q.cond.L.Lock() defer q.cond.L.Unlock() - delete(q.processing, item) - if _, ok := q.dirty[item]; ok { + delete(q.processing, item.Key()) + if _, ok := q.dirty[item.Key()]; ok { q.queue = append(q.queue, item) q.cond.Signal() } diff --git a/agent/consul/controller/queue/rate.go b/agent/consul/controller/queue/rate.go index f4f0dc5ad..471601f85 100644 --- a/agent/consul/controller/queue/rate.go +++ b/agent/consul/controller/queue/rate.go @@ -22,10 +22,8 @@ type Limiter[T ItemType] interface { Forget(request T) } -var _ Limiter[string] = &ratelimiter[string]{} - type ratelimiter[T ItemType] struct { - failures map[T]int + failures map[string]int base time.Duration max time.Duration mutex sync.RWMutex @@ -35,7 +33,7 @@ type ratelimiter[T ItemType] struct { // backoff. func NewRateLimiter[T ItemType](base, max time.Duration) Limiter[T] { return &ratelimiter[T]{ - failures: make(map[T]int), + failures: make(map[string]int), base: base, max: max, } @@ -47,8 +45,8 @@ func (r *ratelimiter[T]) NextRetry(request T) time.Duration { r.mutex.RLock() defer r.mutex.RUnlock() - exponent := r.failures[request] - r.failures[request] = r.failures[request] + 1 + exponent := r.failures[request.Key()] + r.failures[request.Key()] = r.failures[request.Key()] + 1 backoff := float64(r.base.Nanoseconds()) * math.Pow(2, float64(exponent)) // make sure we don't overflow time.Duration @@ -69,5 +67,5 @@ func (r *ratelimiter[T]) Forget(request T) { r.mutex.Lock() defer r.mutex.Unlock() - delete(r.failures, request) + delete(r.failures, request.Key()) } diff --git a/agent/consul/controller/queue/rate_test.go b/agent/consul/controller/queue/rate_test.go index f44df15e1..40dc54013 100644 --- a/agent/consul/controller/queue/rate_test.go +++ b/agent/consul/controller/queue/rate_test.go @@ -12,6 +12,8 @@ import ( type Request struct{ Kind string } +func (r Request) Key() string { return r.Kind } + func TestRateLimiter_Backoff(t *testing.T) { t.Parallel() diff --git a/agent/consul/controller/queue_test.go b/agent/consul/controller/queue_test.go index 867dfeff1..11e1bc82b 100644 --- a/agent/consul/controller/queue_test.go +++ b/agent/consul/controller/queue_test.go @@ -10,8 +10,6 @@ import ( "github.com/hashicorp/consul/agent/consul/controller/queue" ) -var _ queue.WorkQueue[string] = &countingWorkQueue[string]{} - type countingWorkQueue[T queue.ItemType] struct { getCounter uint64 addCounter uint64 diff --git a/agent/consul/controller/reconciler.go b/agent/consul/controller/reconciler.go index ce0c6e97a..dc4222508 100644 --- a/agent/consul/controller/reconciler.go +++ b/agent/consul/controller/reconciler.go @@ -20,6 +20,18 @@ type Request struct { Meta *acl.EnterpriseMeta } +// Key satisfies the queue.ItemType interface. It returns a string which will be +// used to de-duplicate requests in the queue. +func (r Request) Key() string { + return fmt.Sprintf( + `kind=%q,name=%q,part=%q,ns=%q`, + r.Kind, + r.Name, + r.Meta.PartitionOrDefault(), + r.Meta.NamespaceOrDefault(), + ) +} + // RequeueAfterError is an error that allows a Reconciler to override the // exponential backoff behavior of the Controller, rather than applying // the backoff algorithm, returning a RequeueAfterError will cause the diff --git a/internal/controller/api.go b/internal/controller/api.go index d258eb40d..7a2e89be4 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -127,6 +127,19 @@ type Request struct { ID *pbresource.ID } +// Key satisfies the queue.ItemType interface. It returns a string which will be +// used to de-duplicate requests in the queue. +func (r Request) Key() string { + return fmt.Sprintf( + "part=%q,peer=%q,ns=%q,name=%q,uid=%q", + r.ID.Tenancy.Partition, + r.ID.Tenancy.PeerName, + r.ID.Tenancy.Namespace, + r.ID.Name, + r.ID.Uid, + ) +} + // Runtime contains the dependencies required by reconcilers. type Runtime struct { Client pbresource.ResourceServiceClient diff --git a/internal/controller/controller.go b/internal/controller/controller.go index d99ca26f0..296ff5faf 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -43,11 +43,13 @@ func (c *controllerRunner) run(ctx context.Context) error { for _, watch := range c.ctrl.watches { watch := watch - mapQueue := runQueue[*pbresource.Resource](groupCtx, c.ctrl) + mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) // Watched Type Events → Mapper Queue group.Go(func() error { - return c.watch(groupCtx, watch.watchedType, mapQueue.Add) + return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) { + mapQueue.Add(mapperRequest{res: res}) + }) }) // Mapper Queue → Mapper → Reconciliation Queue @@ -96,13 +98,13 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func (c *controllerRunner) runMapper( ctx context.Context, w watch, - from queue.WorkQueue[*pbresource.Resource], + from queue.WorkQueue[mapperRequest], to queue.WorkQueue[Request], ) error { logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) for { - res, shutdown := from.Get() + item, shutdown := from.Get() if shutdown { return nil } @@ -110,12 +112,12 @@ func (c *controllerRunner) runMapper( var reqs []Request err := c.handlePanic(func() error { var err error - reqs, err = w.mapper(ctx, c.runtime(), res) + reqs, err = w.mapper(ctx, c.runtime(), item.res) return err }) if err != nil { - from.AddRateLimited(res) - from.Done(res) + from.AddRateLimited(item) + from.Done(item) continue } @@ -130,8 +132,8 @@ func (c *controllerRunner) runMapper( to.Add(r) } - from.Forget(res) - from.Done(res) + from.Forget(item) + from.Done(item) } } @@ -183,3 +185,19 @@ func (c *controllerRunner) runtime() Runtime { Logger: c.logger, } } + +type mapperRequest struct{ res *pbresource.Resource } + +// Key satisfies the queue.ItemType interface. It returns a string which will be +// used to de-duplicate requests in the queue. +func (i mapperRequest) Key() string { + return fmt.Sprintf( + "type=%q,part=%q,peer=%q,ns=%q,name=%q,uid=%q", + resource.ToGVK(i.res.Id.Type), + i.res.Id.Tenancy.Partition, + i.res.Id.Tenancy.PeerName, + i.res.Id.Tenancy.Namespace, + i.res.Id.Name, + i.res.Id.Uid, + ) +}