controller: deduplicate items in queue (#17168)

This commit is contained in:
Dan Upton 2023-05-09 18:14:20 +01:00 committed by GitHub
parent 93a906d2a4
commit 91f76b6fb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 79 additions and 37 deletions

View File

@ -41,7 +41,7 @@ type deferredRequest[T ItemType] struct {
// future processing
type deferQueue[T ItemType] struct {
heap *deferHeap[T]
entries map[T]*deferredRequest[T]
entries map[string]*deferredRequest[T]
addChannel chan *deferredRequest[T]
heartbeat *time.Ticker
@ -55,7 +55,7 @@ func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] {
return &deferQueue[T]{
heap: dHeap,
entries: make(map[T]*deferredRequest[T]),
entries: make(map[string]*deferredRequest[T]),
addChannel: make(chan *deferredRequest[T]),
heartbeat: time.NewTicker(tick),
}
@ -78,7 +78,7 @@ func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) {
// deferEntry adds a deferred request to the priority queue
func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) {
existing, exists := q.entries[entry.item]
existing, exists := q.entries[entry.item.Key()]
if exists {
// insert or update the item deferral time
if existing.enqueueAt.After(entry.enqueueAt) {
@ -90,7 +90,7 @@ func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) {
}
heap.Push(q.heap, entry)
q.entries[entry.item] = entry
q.entries[entry.item.Key()] = entry
}
// readyRequest returns a pointer to the next ready Request or
@ -108,7 +108,7 @@ func (q *deferQueue[T]) readyRequest() *T {
}
entry = heap.Pop(q.heap).(*deferredRequest[T])
delete(q.entries, entry.item)
delete(q.entries, entry.item.Key())
return &entry.item
}
@ -182,8 +182,6 @@ func (q *deferQueue[T]) Process(ctx context.Context, callback func(item T)) {
}
}
var _ heap.Interface = &deferHeap[string]{}
// deferHeap implements heap.Interface
type deferHeap[T ItemType] []*deferredRequest[T]

View File

@ -13,7 +13,10 @@ import (
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go
// ItemType is the type constraint for items in the WorkQueue.
type ItemType comparable
type ItemType interface {
// Key returns a string that will be used to de-duplicate items in the queue.
Key() string
}
// WorkQueue is an interface for a work queue with semantics to help with
// retries and rate limiting.
@ -43,9 +46,9 @@ type queue[T ItemType] struct {
// dirty holds the working set of all Requests, whether they are being
// processed or not
dirty map[T]struct{}
dirty map[string]struct{}
// processing holds the set of current requests being processed
processing map[T]struct{}
processing map[string]struct{}
// deferred is an internal priority queue that tracks deferred
// Requests
@ -66,8 +69,8 @@ type queue[T ItemType] struct {
func RunWorkQueue[T ItemType](ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue[T] {
q := &queue[T]{
ratelimiter: NewRateLimiter[T](baseBackoff, maxBackoff),
dirty: make(map[T]struct{}),
processing: make(map[T]struct{}),
dirty: make(map[string]struct{}),
processing: make(map[string]struct{}),
cond: sync.NewCond(&sync.Mutex{}),
deferred: NewDeferQueue[T](500 * time.Millisecond),
ctx: ctx,
@ -115,8 +118,8 @@ func (q *queue[T]) Get() (item T, shutdown bool) {
item, q.queue = q.queue[0], q.queue[1:]
q.processing[item] = struct{}{}
delete(q.dirty, item)
q.processing[item.Key()] = struct{}{}
delete(q.dirty, item.Key())
return item, false
}
@ -129,12 +132,12 @@ func (q *queue[T]) Add(item T) {
if q.shuttingDown() {
return
}
if _, ok := q.dirty[item]; ok {
if _, ok := q.dirty[item.Key()]; ok {
return
}
q.dirty[item] = struct{}{}
if _, ok := q.processing[item]; ok {
q.dirty[item.Key()] = struct{}{}
if _, ok := q.processing[item.Key()]; ok {
return
}
@ -175,8 +178,8 @@ func (q *queue[T]) Done(item T) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
delete(q.processing, item)
if _, ok := q.dirty[item]; ok {
delete(q.processing, item.Key())
if _, ok := q.dirty[item.Key()]; ok {
q.queue = append(q.queue, item)
q.cond.Signal()
}

View File

@ -22,10 +22,8 @@ type Limiter[T ItemType] interface {
Forget(request T)
}
var _ Limiter[string] = &ratelimiter[string]{}
type ratelimiter[T ItemType] struct {
failures map[T]int
failures map[string]int
base time.Duration
max time.Duration
mutex sync.RWMutex
@ -35,7 +33,7 @@ type ratelimiter[T ItemType] struct {
// backoff.
func NewRateLimiter[T ItemType](base, max time.Duration) Limiter[T] {
return &ratelimiter[T]{
failures: make(map[T]int),
failures: make(map[string]int),
base: base,
max: max,
}
@ -47,8 +45,8 @@ func (r *ratelimiter[T]) NextRetry(request T) time.Duration {
r.mutex.RLock()
defer r.mutex.RUnlock()
exponent := r.failures[request]
r.failures[request] = r.failures[request] + 1
exponent := r.failures[request.Key()]
r.failures[request.Key()] = r.failures[request.Key()] + 1
backoff := float64(r.base.Nanoseconds()) * math.Pow(2, float64(exponent))
// make sure we don't overflow time.Duration
@ -69,5 +67,5 @@ func (r *ratelimiter[T]) Forget(request T) {
r.mutex.Lock()
defer r.mutex.Unlock()
delete(r.failures, request)
delete(r.failures, request.Key())
}

View File

@ -12,6 +12,8 @@ import (
type Request struct{ Kind string }
func (r Request) Key() string { return r.Kind }
func TestRateLimiter_Backoff(t *testing.T) {
t.Parallel()

View File

@ -10,8 +10,6 @@ import (
"github.com/hashicorp/consul/agent/consul/controller/queue"
)
var _ queue.WorkQueue[string] = &countingWorkQueue[string]{}
type countingWorkQueue[T queue.ItemType] struct {
getCounter uint64
addCounter uint64

View File

@ -20,6 +20,18 @@ type Request struct {
Meta *acl.EnterpriseMeta
}
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (r Request) Key() string {
return fmt.Sprintf(
`kind=%q,name=%q,part=%q,ns=%q`,
r.Kind,
r.Name,
r.Meta.PartitionOrDefault(),
r.Meta.NamespaceOrDefault(),
)
}
// RequeueAfterError is an error that allows a Reconciler to override the
// exponential backoff behavior of the Controller, rather than applying
// the backoff algorithm, returning a RequeueAfterError will cause the

View File

@ -127,6 +127,19 @@ type Request struct {
ID *pbresource.ID
}
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (r Request) Key() string {
return fmt.Sprintf(
"part=%q,peer=%q,ns=%q,name=%q,uid=%q",
r.ID.Tenancy.Partition,
r.ID.Tenancy.PeerName,
r.ID.Tenancy.Namespace,
r.ID.Name,
r.ID.Uid,
)
}
// Runtime contains the dependencies required by reconcilers.
type Runtime struct {
Client pbresource.ResourceServiceClient

View File

@ -43,11 +43,13 @@ func (c *controllerRunner) run(ctx context.Context) error {
for _, watch := range c.ctrl.watches {
watch := watch
mapQueue := runQueue[*pbresource.Resource](groupCtx, c.ctrl)
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)
// Watched Type Events → Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watch.watchedType, mapQueue.Add)
return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
})
// Mapper Queue → Mapper → Reconciliation Queue
@ -96,13 +98,13 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
func (c *controllerRunner) runMapper(
ctx context.Context,
w watch,
from queue.WorkQueue[*pbresource.Resource],
from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request],
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))
for {
res, shutdown := from.Get()
item, shutdown := from.Get()
if shutdown {
return nil
}
@ -110,12 +112,12 @@ func (c *controllerRunner) runMapper(
var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), res)
reqs, err = w.mapper(ctx, c.runtime(), item.res)
return err
})
if err != nil {
from.AddRateLimited(res)
from.Done(res)
from.AddRateLimited(item)
from.Done(item)
continue
}
@ -130,8 +132,8 @@ func (c *controllerRunner) runMapper(
to.Add(r)
}
from.Forget(res)
from.Done(res)
from.Forget(item)
from.Done(item)
}
}
@ -183,3 +185,19 @@ func (c *controllerRunner) runtime() Runtime {
Logger: c.logger,
}
}
type mapperRequest struct{ res *pbresource.Resource }
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (i mapperRequest) Key() string {
return fmt.Sprintf(
"type=%q,part=%q,peer=%q,ns=%q,name=%q,uid=%q",
resource.ToGVK(i.res.Id.Type),
i.res.Id.Tenancy.Partition,
i.res.Id.Tenancy.PeerName,
i.res.Id.Tenancy.Namespace,
i.res.Id.Name,
i.res.Id.Uid,
)
}