Add async reconciliation controller subpackage (#15534)

* Add async reconciliation controller subpackage

* Address initial feedback

* Add tests for panic assertions

* Fix comment
This commit is contained in:
Andrew Stucki 2022-12-16 16:49:26 -05:00 committed by GitHub
parent a1ceeff461
commit 1ff0906a3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1309 additions and 0 deletions

View File

@ -0,0 +1,289 @@
package controller
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"golang.org/x/sync/errgroup"
)
// much of this is a re-implementation of
// https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.13/pkg/internal/controller/controller.go
// Transformer is a function that takes one type of config entry that has changed
// and transforms that into a set of reconciliation requests to enqueue.
type Transformer func(entry structs.ConfigEntry) []Request
// Controller subscribes to a set of watched resources from the
// state store and delegates processing them to a given Reconciler.
// If a Reconciler errors while processing a Request, then the
// Controller handles rescheduling the Request to be re-processed.
type Controller interface {
// Run begins the Controller's main processing loop. When the given
// context is canceled, the Controller stops processing any remaining work.
// The Run function should only ever be called once.
Run(ctx context.Context) error
// Subscribe tells the controller to subscribe to updates for config entries based
// on the given request. Optional transformation functions can also be passed in
// to Subscribe, allowing a controller to map a config entry to a different type of
// request under the hood (i.e. watching a dependency and triggering a Reconcile on
// the dependent resource). This should only ever be called prior to calling Run.
Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller
// WithBackoff changes the base and maximum backoff values for the Controller's
// Request retry rate limiter. This should only ever be called prior to
// running Run.
WithBackoff(base, max time.Duration) Controller
// WithWorkers sets the number of worker goroutines used to process the queue
// this defaults to 1 goroutine.
WithWorkers(i int) Controller
// WithQueueFactory allows a Controller to replace its underlying work queue
// implementation. This is most useful for testing. This should only ever be called
// prior to running Run.
WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller
}
var _ Controller = &controller{}
type subscription struct {
request *stream.SubscribeRequest
transformers []Transformer
}
// controller implements the Controller interface
type controller struct {
// reconciler is the Reconciler that processes all subscribed
// Requests
reconciler Reconciler
// makeQueue is the factory used for creating the work queue, generally
// this shouldn't be touched, but can be updated for testing purposes
makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue
// workers is the number of workers to use to process data
workers int
// work is the internal work queue that pending Requests are added to
work WorkQueue
// baseBackoff is the starting backoff time for the work queue's rate limiter
baseBackoff time.Duration
// maxBackoff is the maximum backoff time for the work queue's rate limiter
maxBackoff time.Duration
// subscriptions is a list of subscription requests for retrieving configuration entries
subscriptions []subscription
// publisher is the event publisher that should be subscribed to for any updates
publisher state.EventPublisher
// running ensures that we are only calling Run a single time
running int32
}
// New returns a new Controller associated with the given state store and reconciler.
func New(publisher state.EventPublisher, reconciler Reconciler) Controller {
return &controller{
reconciler: reconciler,
publisher: publisher,
workers: 1,
baseBackoff: 5 * time.Millisecond,
maxBackoff: 1000 * time.Second,
makeQueue: RunWorkQueue,
}
}
// Subscribe tells the controller to subscribe to updates for config entries of the
// given kind and with the associated enterprise metadata. This should only ever be
// called prior to running Start.
func (c *controller) Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller {
c.ensureNotRunning()
c.subscriptions = append(c.subscriptions, subscription{
request: request,
transformers: transformers,
})
return c
}
// WithBackoff changes the base and maximum backoff values for the Controller's
// Request retry rate limiter. This should only ever be called prior to
// running Start.
func (c *controller) WithBackoff(base, max time.Duration) Controller {
c.ensureNotRunning()
c.baseBackoff = base
c.maxBackoff = max
return c
}
// WithWorkers sets the number of worker goroutines used to process the queue
// this defaults to 1 goroutine.
func (c *controller) WithWorkers(i int) Controller {
c.ensureNotRunning()
if i <= 0 {
i = 1
}
c.workers = i
return c
}
// WithQueueFactory changes the initialization method for the Controller's work
// queue, this is predominantly just used for testing. This should only ever be called
// prior to running Start.
func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller {
c.ensureNotRunning()
c.makeQueue = fn
return c
}
// ensureNotRunning makes sure we aren't trying to reconfigure an already
// running controller, it panics if Run has already been invoked
func (c *controller) ensureNotRunning() {
if atomic.LoadInt32(&c.running) == 1 {
panic("cannot configure controller once Run is called")
}
}
// Run begins the Controller's main processing loop. When the given
// context is canceled, the Controller stops processing any remaining work.
// The Run function should only ever be called once, calling it multiple
// times will result in a panic.
func (c *controller) Run(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&c.running, 0, 1) {
panic("Run cannot be called more than once")
}
group, groupCtx := errgroup.WithContext(ctx)
// set up our queue
c.work = c.makeQueue(groupCtx, c.baseBackoff, c.maxBackoff)
for _, sub := range c.subscriptions {
// store a reference for the closure
sub := sub
group.Go(func() error {
var index uint64
subscription, err := c.publisher.Subscribe(sub.request)
if err != nil {
return err
}
defer subscription.Unsubscribe()
for {
event, err := subscription.Next(ctx)
switch {
case errors.Is(err, context.Canceled):
return nil
case err != nil:
return err
}
if event.IsFramingEvent() {
continue
}
if event.Index <= index {
continue
}
index = event.Index
if err := c.processEvent(sub, event); err != nil {
return err
}
}
})
}
for i := 0; i < c.workers; i++ {
group.Go(func() error {
for {
request, shutdown := c.work.Get()
if shutdown {
// Stop working
return nil
}
c.reconcileHandler(groupCtx, request)
// Done is called here because it is required to be called
// when we've finished processing each request
c.work.Done(request)
}
})
}
<-groupCtx.Done()
return nil
}
func (c *controller) processEvent(sub subscription, event stream.Event) error {
switch payload := event.Payload.(type) {
case state.EventPayloadConfigEntry:
c.enqueueEntry(payload.Value, sub.transformers...)
return nil
case *stream.PayloadEvents:
for _, event := range payload.Items {
if err := c.processEvent(sub, event); err != nil {
return err
}
}
return nil
default:
return fmt.Errorf("unhandled event type: %T", payload)
}
}
// enqueueEntry adds all of the given entry into the work queue. If given
// one or more transformation functions, it will enqueue all of the resulting
// reconciliation requests returned from each Transformer.
func (c *controller) enqueueEntry(entry structs.ConfigEntry, transformers ...Transformer) {
if len(transformers) == 0 {
c.work.Add(Request{
Kind: entry.GetKind(),
Name: entry.GetName(),
Meta: entry.GetEnterpriseMeta(),
})
} else {
for _, fn := range transformers {
for _, request := range fn(entry) {
c.work.Add(request)
}
}
}
}
// reconcile wraps the reconciler in a panic handler
func (c *controller) reconcile(ctx context.Context, req Request) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic [recovered]: %v", r)
return
}
}()
return c.reconciler.Reconcile(ctx, req)
}
// reconcileHandler invokes the reconciler and looks at its return value
// to determine whether the request should be rescheduled
func (c *controller) reconcileHandler(ctx context.Context, req Request) {
if err := c.reconcile(ctx, req); err != nil {
// handle the case where we're specifically told to requeue later
var requeueAfter RequeueAfterError
if errors.As(err, &requeueAfter) {
c.work.Forget(req)
c.work.AddAfter(req, time.Duration(requeueAfter))
return
}
// fallback to rate limit ourselves
c.work.AddRateLimited(req)
return
}
// if no error then Forget this request so it is not retried
c.work.Forget(req)
}

View File

@ -0,0 +1,273 @@
package controller
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
func TestBasicController(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}))
}
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)
received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, structs.IngressGateway, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
// since we only modified each entry once, we should have exactly 200 reconcliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
}
func TestBasicController_Transform(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}, func(entry structs.ConfigEntry) []Request {
return []Request{{
Kind: "foo",
Name: "bar",
}}
}).Run(ctx)
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "test",
}))
select {
case request := <-reconciler.received:
require.Equal(t, "foo", request.Kind)
require.Equal(t, "bar", request.Name)
case <-ctx.Done():
t.Fatal("stopped reconciler before event received")
}
}
func TestBasicController_Retry(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
reconciler := newTestReconciler(true)
defer reconciler.stop()
publisher := stream.NewEventPublisher(0)
go publisher.Run(ctx)
// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()
queueInitialized := make(chan *countingWorkQueue)
controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond)
go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
queue := newCountingWorkQueue(RunWorkQueue(ctx, baseBackoff, maxBackoff))
queueInitialized <- queue
return queue
}).Run(ctx)
queue := <-queueInitialized
ensureCalled := func(request chan Request, name string) bool {
// give a short window for our counters to update
defer time.Sleep(10 * time.Millisecond)
select {
case req := <-request:
require.Equal(t, structs.IngressGateway, req.Kind)
require.Equal(t, name, req.Name)
return true
case <-time.After(10 * time.Millisecond):
return false
}
}
// check to make sure we are called once
queue.reset()
require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-1",
}))
require.False(t, ensureCalled(reconciler.received, "foo-1"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-1"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 0, queue.requeues())
// check that we requeue when an arbitrary error occurs
queue.reset()
reconciler.setResponse(errors.New("error"))
require.NoError(t, store.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-2",
}))
require.False(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
require.EqualValues(t, 0, queue.addRateLimiteds())
reconciler.step()
// check we're processed the first time and re-queued
require.True(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addRateLimiteds())
// now make sure we succeed
reconciler.setResponse(nil)
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-2"))
require.EqualValues(t, 2, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addRateLimiteds())
// check that we requeue at a given rate when using a RequeueAfterError
queue.reset()
reconciler.setResponse(RequeueNow())
require.NoError(t, store.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "foo-3",
}))
require.False(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 0, queue.dones())
require.EqualValues(t, 0, queue.requeues())
require.EqualValues(t, 0, queue.addRateLimiteds())
reconciler.step()
// check we're processed the first time and re-queued
require.True(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 1, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addAfters())
// now make sure we succeed
reconciler.setResponse(nil)
reconciler.step()
require.True(t, ensureCalled(reconciler.received, "foo-3"))
require.EqualValues(t, 2, queue.dones())
require.EqualValues(t, 1, queue.requeues())
require.EqualValues(t, 1, queue.addAfters())
}
func TestBasicController_RunPanicAssertions(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
started := make(chan struct{})
reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(0)
controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
close(started)
return RunWorkQueue(ctx, baseBackoff, maxBackoff)
})
subscription := &stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard,
}
// kick off the controller
go controller.Subscribe(subscription).Run(ctx)
// wait to make sure the following assertions don't
// get run before the above goroutine is spawned
<-started
// make sure we can't call Run again
require.Panics(t, func() {
controller.Run(ctx)
})
// make sure all of our configuration methods panic
require.Panics(t, func() {
controller.Subscribe(subscription)
})
require.Panics(t, func() {
controller.WithBackoff(1, 10)
})
require.Panics(t, func() {
controller.WithWorkers(1)
})
require.Panics(t, func() {
controller.WithQueueFactory(RunWorkQueue)
})
}

View File

@ -0,0 +1,224 @@
package controller
import (
"container/heap"
"context"
"time"
)
// much of this is a re-implementation of
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/delaying_queue.go
// DeferQueue is a generic priority queue implementation that
// allows for deferring and later processing Requests.
type DeferQueue interface {
// Defer defers processing a Request until a given time. When
// the timeout is hit, the request will be processed by the
// callback given in the Process loop. If the given context
// is canceled, the item is not deferred.
Defer(ctx context.Context, item Request, until time.Time)
// Process processes all items in the defer queue with the
// given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a
// long-lived goroutine.
Process(ctx context.Context, callback func(item Request))
}
// deferredRequest is a wrapped Request with information about
// when a retry should be attempted
type deferredRequest struct {
enqueueAt time.Time
item Request
// index holds the index for the given heap entry so that if
// the entry is updated the heap can be re-sorted
index int
}
// deferQueue is a priority queue for deferring Requests for
// future processing
type deferQueue struct {
heap *deferHeap
entries map[Request]*deferredRequest
addChannel chan *deferredRequest
heartbeat *time.Ticker
nextReadyTimer *time.Timer
}
// NewDeferQueue returns a priority queue for deferred Requests.
func NewDeferQueue(tick time.Duration) DeferQueue {
dHeap := &deferHeap{}
heap.Init(dHeap)
return &deferQueue{
heap: dHeap,
entries: make(map[Request]*deferredRequest),
addChannel: make(chan *deferredRequest),
heartbeat: time.NewTicker(tick),
}
}
// Defer defers the given Request until the given time in the future. If the
// passed in context is canceled before the Request is deferred, then this
// immediately returns.
func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) {
entry := &deferredRequest{
enqueueAt: until,
item: item,
}
select {
case <-ctx.Done():
case q.addChannel <- entry:
}
}
// deferEntry adds a deferred request to the priority queue
func (q *deferQueue) deferEntry(entry *deferredRequest) {
existing, exists := q.entries[entry.item]
if exists {
// insert or update the item deferral time
if existing.enqueueAt.After(entry.enqueueAt) {
existing.enqueueAt = entry.enqueueAt
heap.Fix(q.heap, existing.index)
}
return
}
heap.Push(q.heap, entry)
q.entries[entry.item] = entry
}
// readyRequest returns a pointer to the next ready Request or
// nil if no Requests are ready to be processed
func (q *deferQueue) readyRequest() *Request {
if q.heap.Len() == 0 {
return nil
}
now := time.Now()
entry := q.heap.Peek().(*deferredRequest)
if entry.enqueueAt.After(now) {
return nil
}
entry = heap.Pop(q.heap).(*deferredRequest)
delete(q.entries, entry.item)
return &entry.item
}
// signalReady returns a timer signal to the next Request
// that will be ready on the queue
func (q *deferQueue) signalReady() <-chan time.Time {
if q.heap.Len() == 0 {
return make(<-chan time.Time)
}
if q.nextReadyTimer != nil {
q.nextReadyTimer.Stop()
}
now := time.Now()
entry := q.heap.Peek().(*deferredRequest)
q.nextReadyTimer = time.NewTimer(entry.enqueueAt.Sub(now))
return q.nextReadyTimer.C
}
// Process processes all items in the defer queue with the
// given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a
// long-lived goroutine.
func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) {
for {
ready := q.readyRequest()
if ready != nil {
callback(*ready)
}
signalReady := q.signalReady()
select {
case <-ctx.Done():
if q.nextReadyTimer != nil {
q.nextReadyTimer.Stop()
}
q.heartbeat.Stop()
return
case <-q.heartbeat.C:
// continue the loop, which process ready items
case <-signalReady:
// continue the loop, which process ready items
case entry := <-q.addChannel:
enqueueOrProcess := func(entry *deferredRequest) {
now := time.Now()
if entry.enqueueAt.After(now) {
q.deferEntry(entry)
} else {
// fast-path, process immediately if we don't need to defer
callback(entry.item)
}
}
enqueueOrProcess(entry)
// drain the add channel before we do anything else
drained := false
for !drained {
select {
case entry := <-q.addChannel:
enqueueOrProcess(entry)
default:
drained = true
}
}
}
}
}
var _ heap.Interface = &deferHeap{}
// deferHeap implements heap.Interface
type deferHeap []*deferredRequest
// Len returns the length of the heap.
func (h deferHeap) Len() int {
return len(h)
}
// Less compares heap items for purposes of sorting.
func (h deferHeap) Less(i, j int) bool {
return h[i].enqueueAt.Before(h[j].enqueueAt)
}
// Swap swaps two entries in the heap.
func (h deferHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
// Push pushes an entry onto the heap.
func (h *deferHeap) Push(x interface{}) {
n := len(*h)
item := x.(*deferredRequest)
item.index = n
*h = append(*h, item)
}
// Pop pops an entry off the heap.
func (h *deferHeap) Pop() interface{} {
n := len(*h)
item := (*h)[n-1]
item.index = -1
*h = (*h)[0:(n - 1)]
return item
}
// Peek returns the next item on the heap.
func (h deferHeap) Peek() interface{} {
return h[0]
}

View File

@ -0,0 +1,10 @@
// Package controller contains a re-implementation of the Kubernetes
// [controller-runtime](https://github.com/kubernetes-sigs/controller-runtime)
// with the core using Consul's event publishing pipeline rather than
// Kubernetes' client list/watch APIs.
//
// Generally this package enables defining asynchronous control loops
// meant to be run on a Consul cluster's leader that reconcile derived state
// in config entries that might be dependent on multiple sources.
package controller

View File

@ -0,0 +1,176 @@
package controller
import (
"context"
"sync"
"time"
)
// much of this is a re-implementation of
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go
// WorkQueue is an interface for a work queue with semantics to help with
// retries and rate limiting.
type WorkQueue interface {
// Get retrieves the next Request in the queue, blocking until a Request is
// available, if shutdown is true, then the queue is shutting down and should
// no longer be used by the caller.
Get() (item Request, shutdown bool)
// Add immediately adds a Request to the work queue.
Add(item Request)
// AddAfter adds a Request to the work queue after a given amount of time.
AddAfter(item Request, duration time.Duration)
// AddRateLimited adds a Request to the work queue after the amount of time
// specified by applying the queue's rate limiter.
AddRateLimited(item Request)
// Forget signals the queue to reset the rate-limiting for the given Request.
Forget(item Request)
// Done tells the work queue that the Request has been successfully processed
// and can be deleted from the queue.
Done(item Request)
}
// queue implements a rate-limited work queue
type queue struct {
// queue holds an ordered list of Requests needing to be processed
queue []Request
// dirty holds the working set of all Requests, whether they are being
// processed or not
dirty map[Request]struct{}
// processing holds the set of current requests being processed
processing map[Request]struct{}
// deferred is an internal priority queue that tracks deferred
// Requests
deferred DeferQueue
// ratelimiter is the internal rate-limiter for the queue
ratelimiter Limiter
// cond synchronizes queue access and handles signalling for when
// data is available in the queue
cond *sync.Cond
// ctx is the top-level context that, when canceled, shuts down the queue
ctx context.Context
}
// RunWorkQueue returns a started WorkQueue that has per-Request exponential backoff rate-limiting.
// When the passed in context is canceled, the queue shuts down.
func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue {
q := &queue{
ratelimiter: NewRateLimiter(baseBackoff, maxBackoff),
dirty: make(map[Request]struct{}),
processing: make(map[Request]struct{}),
cond: sync.NewCond(&sync.Mutex{}),
deferred: NewDeferQueue(500 * time.Millisecond),
ctx: ctx,
}
go q.start()
return q
}
// start begins the asynchronous processing loop for the deferral queue
func (q *queue) start() {
go q.deferred.Process(q.ctx, func(item Request) {
q.Add(item)
})
<-q.ctx.Done()
q.cond.Broadcast()
}
// shuttingDown returns whether the queue is in the process of shutting down
func (q *queue) shuttingDown() bool {
select {
case <-q.ctx.Done():
return true
default:
return false
}
}
// Get returns the next Request to be processed by the caller, blocking until
// an item is available in the queue. If the returned shutdown parameter is true,
// then the caller should stop using the queue. Any Requests returned by a call
// to Get must be explicitly marked as processed via the Done method.
func (q *queue) Get() (item Request, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown() {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return Request{}, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.processing[item] = struct{}{}
delete(q.dirty, item)
return item, false
}
// Add puts the given Request in the queue. If the Request is already in
// the queue or the queue is stopping, then this is a no-op.
func (q *queue) Add(item Request) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown() {
return
}
if _, ok := q.dirty[item]; ok {
return
}
q.dirty[item] = struct{}{}
if _, ok := q.processing[item]; ok {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// AddAfter adds a Request to the work queue after a given amount of time.
func (q *queue) AddAfter(item Request, duration time.Duration) {
// don't add if we're already shutting down
if q.shuttingDown() {
return
}
// immediately add if there is no delay
if duration <= 0 {
q.Add(item)
return
}
q.deferred.Defer(q.ctx, item, time.Now().Add(duration))
}
// AddRateLimited adds the given Request to the queue after applying the
// rate limiter to determine when the Request should next be processed.
func (q *queue) AddRateLimited(item Request) {
q.AddAfter(item, q.ratelimiter.NextRetry(item))
}
// Forget signals the queue to reset the rate-limiting for the given Request.
func (q *queue) Forget(item Request) {
q.ratelimiter.Forget(item)
}
// Done removes the item from the queue, if it has been marked dirty
// again while being processed, it is re-added to the queue.
func (q *queue) Done(item Request) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
delete(q.processing, item)
if _, ok := q.dirty[item]; ok {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}

View File

@ -0,0 +1,93 @@
package controller
import (
"sync/atomic"
"time"
)
var _ WorkQueue = &countingWorkQueue{}
type countingWorkQueue struct {
getCounter uint64
addCounter uint64
addAfterCounter uint64
addRateLimitedCounter uint64
forgetCounter uint64
doneCounter uint64
inner WorkQueue
}
func newCountingWorkQueue(inner WorkQueue) *countingWorkQueue {
return &countingWorkQueue{
inner: inner,
}
}
func (c *countingWorkQueue) reset() {
atomic.StoreUint64(&c.getCounter, 0)
atomic.StoreUint64(&c.addCounter, 0)
atomic.StoreUint64(&c.addAfterCounter, 0)
atomic.StoreUint64(&c.addRateLimitedCounter, 0)
atomic.StoreUint64(&c.forgetCounter, 0)
atomic.StoreUint64(&c.doneCounter, 0)
}
func (c *countingWorkQueue) requeues() uint64 {
return c.addAfters() + c.addRateLimiteds()
}
func (c *countingWorkQueue) Get() (item Request, shutdown bool) {
item, err := c.inner.Get()
atomic.AddUint64(&c.getCounter, 1)
return item, err
}
func (c *countingWorkQueue) gets() uint64 {
return atomic.LoadUint64(&c.getCounter)
}
func (c *countingWorkQueue) Add(item Request) {
c.inner.Add(item)
atomic.AddUint64(&c.addCounter, 1)
}
func (c *countingWorkQueue) adds() uint64 {
return atomic.LoadUint64(&c.addCounter)
}
func (c *countingWorkQueue) AddAfter(item Request, duration time.Duration) {
c.inner.AddAfter(item, duration)
atomic.AddUint64(&c.addAfterCounter, 1)
}
func (c *countingWorkQueue) addAfters() uint64 {
return atomic.LoadUint64(&c.addAfterCounter)
}
func (c *countingWorkQueue) AddRateLimited(item Request) {
c.inner.AddRateLimited(item)
atomic.AddUint64(&c.addRateLimitedCounter, 1)
}
func (c *countingWorkQueue) addRateLimiteds() uint64 {
return atomic.LoadUint64(&c.addRateLimitedCounter)
}
func (c *countingWorkQueue) Forget(item Request) {
c.inner.Forget(item)
atomic.AddUint64(&c.forgetCounter, 1)
}
func (c *countingWorkQueue) forgets() uint64 {
return atomic.LoadUint64(&c.forgetCounter)
}
func (c *countingWorkQueue) Done(item Request) {
c.inner.Done(item)
atomic.AddUint64(&c.doneCounter, 1)
}
func (c *countingWorkQueue) dones() uint64 {
return atomic.LoadUint64(&c.doneCounter)
}

View File

@ -0,0 +1,70 @@
package controller
import (
"math"
"sync"
"time"
)
// much of this is a re-implementation of:
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/default_rate_limiters.go
// Limiter is an interface for a rate limiter that can limit
// the number of retries processed in the work queue.
type Limiter interface {
// NextRetry returns the remaining time until the queue should
// reprocess a Request.
NextRetry(request Request) time.Duration
// Forget causes the Limiter to reset the backoff for the Request.
Forget(request Request)
}
var _ Limiter = &ratelimiter{}
type ratelimiter struct {
failures map[Request]int
base time.Duration
max time.Duration
mutex sync.RWMutex
}
// NewRateLimiter returns a Limiter that does per-item exponential
// backoff.
func NewRateLimiter(base, max time.Duration) Limiter {
return &ratelimiter{
failures: make(map[Request]int),
base: base,
max: max,
}
}
// NextRetry returns the remaining time until the queue should
// reprocess a Request.
func (r *ratelimiter) NextRetry(request Request) time.Duration {
r.mutex.RLock()
defer r.mutex.RUnlock()
exponent := r.failures[request]
r.failures[request] = r.failures[request] + 1
backoff := float64(r.base.Nanoseconds()) * math.Pow(2, float64(exponent))
// make sure we don't overflow time.Duration
if backoff > math.MaxInt64 {
return r.max
}
calculated := time.Duration(backoff)
if calculated > r.max {
return r.max
}
return calculated
}
// Forget causes the Limiter to reset the backoff for the Request.
func (r *ratelimiter) Forget(request Request) {
r.mutex.Lock()
defer r.mutex.Unlock()
delete(r.failures, request)
}

View File

@ -0,0 +1,62 @@
package controller
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestRateLimiter_Backoff(t *testing.T) {
t.Parallel()
limiter := NewRateLimiter(1*time.Millisecond, 1*time.Second)
request := Request{Kind: "one"}
require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request))
require.Equal(t, 2*time.Millisecond, limiter.NextRetry(request))
require.Equal(t, 4*time.Millisecond, limiter.NextRetry(request))
require.Equal(t, 8*time.Millisecond, limiter.NextRetry(request))
require.Equal(t, 16*time.Millisecond, limiter.NextRetry(request))
requestTwo := Request{Kind: "two"}
require.Equal(t, 1*time.Millisecond, limiter.NextRetry(requestTwo))
require.Equal(t, 2*time.Millisecond, limiter.NextRetry(requestTwo))
limiter.Forget(request)
require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request))
}
func TestRateLimiter_Overflow(t *testing.T) {
t.Parallel()
limiter := NewRateLimiter(1*time.Millisecond, 1000*time.Second)
request := Request{Kind: "one"}
for i := 0; i < 5; i++ {
limiter.NextRetry(request)
}
// ensure we have a normally incrementing exponential backoff
require.Equal(t, 32*time.Millisecond, limiter.NextRetry(request))
overflow := Request{Kind: "overflow"}
for i := 0; i < 1000; i++ {
limiter.NextRetry(overflow)
}
// make sure we're capped at the passed in max backoff
require.Equal(t, 1000*time.Second, limiter.NextRetry(overflow))
limiter = NewRateLimiter(1*time.Minute, 1000*time.Hour)
for i := 0; i < 2; i++ {
limiter.NextRetry(request)
}
// ensure we have a normally incrementing exponential backoff
require.Equal(t, 4*time.Minute, limiter.NextRetry(request))
for i := 0; i < 1000; i++ {
limiter.NextRetry(overflow)
}
// make sure we're capped at the passed in max backoff
require.Equal(t, 1000*time.Hour, limiter.NextRetry(overflow))
}

View File

@ -0,0 +1,51 @@
package controller
import (
"context"
"fmt"
"time"
"github.com/hashicorp/consul/acl"
)
// Request contains the information necessary to reconcile a config entry.
// This includes only the information required to uniquely identify the
// config entry.
type Request struct {
Kind string
Name string
Meta *acl.EnterpriseMeta
}
// RequeueAfterError is an error that allows a Reconciler to override the
// exponential backoff behavior of the Controller, rather than applying
// the backoff algorithm, returning a RequeueAfterError will cause the
// Controller to reschedule the Request at a given time in the future.
type RequeueAfterError time.Duration
// Error implements the error interface.
func (r RequeueAfterError) Error() string {
return fmt.Sprintf("requeue at %s", time.Duration(r))
}
// RequeueAfter constructs a RequeueAfterError with the given duration
// setting.
func RequeueAfter(after time.Duration) error {
return RequeueAfterError(after)
}
// RequeueNow constructs a RequeueAfterError that reschedules the Request
// immediately.
func RequeueNow() error {
return RequeueAfterError(0)
}
// Reconciler is the main implementation interface for Controllers. A Reconciler
// receives any change notifications for config entries that the controller is subscribed
// to and processes them with its Reconcile function.
type Reconciler interface {
// Reconcile performs a reconciliation on the config entry referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil.
// If no error is returned, the Request will be removed from the working queue.
Reconcile(context.Context, Request) error
}

View File

@ -0,0 +1,61 @@
package controller
import (
"context"
"sync"
)
type testReconciler struct {
received chan Request
response error
mutex sync.Mutex
stepChan chan struct{}
stopChan chan struct{}
ctx context.Context
}
func (r *testReconciler) Reconcile(ctx context.Context, req Request) error {
if r.stepChan != nil {
select {
case <-r.stopChan:
return nil
case <-r.stepChan:
}
}
select {
case <-r.stopChan:
return nil
case r.received <- req:
}
r.mutex.Lock()
defer r.mutex.Unlock()
return r.response
}
func (r *testReconciler) setResponse(err error) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.response = err
}
func (r *testReconciler) step() {
r.stepChan <- struct{}{}
}
func (r *testReconciler) stop() {
close(r.stopChan)
}
func newTestReconciler(stepping bool) *testReconciler {
r := &testReconciler{
received: make(chan Request, 1000),
stopChan: make(chan struct{}),
}
if stepping {
r.stepChan = make(chan struct{})
}
return r
}