open-consul/agent/consul/controller/controller.go

290 lines
9.0 KiB
Go

package controller
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"golang.org/x/sync/errgroup"
)
// much of this is a re-implementation of
// https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.13/pkg/internal/controller/controller.go
// Transformer is a function that takes one type of config entry that has changed
// and transforms that into a set of reconciliation requests to enqueue.
type Transformer func(entry structs.ConfigEntry) []Request
// Controller subscribes to a set of watched resources from the
// state store and delegates processing them to a given Reconciler.
// If a Reconciler errors while processing a Request, then the
// Controller handles rescheduling the Request to be re-processed.
type Controller interface {
// Run begins the Controller's main processing loop. When the given
// context is canceled, the Controller stops processing any remaining work.
// The Run function should only ever be called once.
Run(ctx context.Context) error
// Subscribe tells the controller to subscribe to updates for config entries based
// on the given request. Optional transformation functions can also be passed in
// to Subscribe, allowing a controller to map a config entry to a different type of
// request under the hood (i.e. watching a dependency and triggering a Reconcile on
// the dependent resource). This should only ever be called prior to calling Run.
Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller
// WithBackoff changes the base and maximum backoff values for the Controller's
// Request retry rate limiter. This should only ever be called prior to
// running Run.
WithBackoff(base, max time.Duration) Controller
// WithWorkers sets the number of worker goroutines used to process the queue
// this defaults to 1 goroutine.
WithWorkers(i int) Controller
// WithQueueFactory allows a Controller to replace its underlying work queue
// implementation. This is most useful for testing. This should only ever be called
// prior to running Run.
WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller
}
var _ Controller = &controller{}
type subscription struct {
request *stream.SubscribeRequest
transformers []Transformer
}
// controller implements the Controller interface
type controller struct {
// reconciler is the Reconciler that processes all subscribed
// Requests
reconciler Reconciler
// makeQueue is the factory used for creating the work queue, generally
// this shouldn't be touched, but can be updated for testing purposes
makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue
// workers is the number of workers to use to process data
workers int
// work is the internal work queue that pending Requests are added to
work WorkQueue
// baseBackoff is the starting backoff time for the work queue's rate limiter
baseBackoff time.Duration
// maxBackoff is the maximum backoff time for the work queue's rate limiter
maxBackoff time.Duration
// subscriptions is a list of subscription requests for retrieving configuration entries
subscriptions []subscription
// publisher is the event publisher that should be subscribed to for any updates
publisher state.EventPublisher
// running ensures that we are only calling Run a single time
running int32
}
// New returns a new Controller associated with the given state store and reconciler.
func New(publisher state.EventPublisher, reconciler Reconciler) Controller {
return &controller{
reconciler: reconciler,
publisher: publisher,
workers: 1,
baseBackoff: 5 * time.Millisecond,
maxBackoff: 1000 * time.Second,
makeQueue: RunWorkQueue,
}
}
// Subscribe tells the controller to subscribe to updates for config entries of the
// given kind and with the associated enterprise metadata. This should only ever be
// called prior to running Start.
func (c *controller) Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller {
c.ensureNotRunning()
c.subscriptions = append(c.subscriptions, subscription{
request: request,
transformers: transformers,
})
return c
}
// WithBackoff changes the base and maximum backoff values for the Controller's
// Request retry rate limiter. This should only ever be called prior to
// running Start.
func (c *controller) WithBackoff(base, max time.Duration) Controller {
c.ensureNotRunning()
c.baseBackoff = base
c.maxBackoff = max
return c
}
// WithWorkers sets the number of worker goroutines used to process the queue
// this defaults to 1 goroutine.
func (c *controller) WithWorkers(i int) Controller {
c.ensureNotRunning()
if i <= 0 {
i = 1
}
c.workers = i
return c
}
// WithQueueFactory changes the initialization method for the Controller's work
// queue, this is predominantly just used for testing. This should only ever be called
// prior to running Start.
func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller {
c.ensureNotRunning()
c.makeQueue = fn
return c
}
// ensureNotRunning makes sure we aren't trying to reconfigure an already
// running controller, it panics if Run has already been invoked
func (c *controller) ensureNotRunning() {
if atomic.LoadInt32(&c.running) == 1 {
panic("cannot configure controller once Run is called")
}
}
// Run begins the Controller's main processing loop. When the given
// context is canceled, the Controller stops processing any remaining work.
// The Run function should only ever be called once, calling it multiple
// times will result in a panic.
func (c *controller) Run(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&c.running, 0, 1) {
panic("Run cannot be called more than once")
}
group, groupCtx := errgroup.WithContext(ctx)
// set up our queue
c.work = c.makeQueue(groupCtx, c.baseBackoff, c.maxBackoff)
for _, sub := range c.subscriptions {
// store a reference for the closure
sub := sub
group.Go(func() error {
var index uint64
subscription, err := c.publisher.Subscribe(sub.request)
if err != nil {
return err
}
defer subscription.Unsubscribe()
for {
event, err := subscription.Next(ctx)
switch {
case errors.Is(err, context.Canceled):
return nil
case err != nil:
return err
}
if event.IsFramingEvent() {
continue
}
if event.Index <= index {
continue
}
index = event.Index
if err := c.processEvent(sub, event); err != nil {
return err
}
}
})
}
for i := 0; i < c.workers; i++ {
group.Go(func() error {
for {
request, shutdown := c.work.Get()
if shutdown {
// Stop working
return nil
}
c.reconcileHandler(groupCtx, request)
// Done is called here because it is required to be called
// when we've finished processing each request
c.work.Done(request)
}
})
}
<-groupCtx.Done()
return nil
}
func (c *controller) processEvent(sub subscription, event stream.Event) error {
switch payload := event.Payload.(type) {
case state.EventPayloadConfigEntry:
c.enqueueEntry(payload.Value, sub.transformers...)
return nil
case *stream.PayloadEvents:
for _, event := range payload.Items {
if err := c.processEvent(sub, event); err != nil {
return err
}
}
return nil
default:
return fmt.Errorf("unhandled event type: %T", payload)
}
}
// enqueueEntry adds all of the given entry into the work queue. If given
// one or more transformation functions, it will enqueue all of the resulting
// reconciliation requests returned from each Transformer.
func (c *controller) enqueueEntry(entry structs.ConfigEntry, transformers ...Transformer) {
if len(transformers) == 0 {
c.work.Add(Request{
Kind: entry.GetKind(),
Name: entry.GetName(),
Meta: entry.GetEnterpriseMeta(),
})
} else {
for _, fn := range transformers {
for _, request := range fn(entry) {
c.work.Add(request)
}
}
}
}
// reconcile wraps the reconciler in a panic handler
func (c *controller) reconcile(ctx context.Context, req Request) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic [recovered]: %v", r)
return
}
}()
return c.reconciler.Reconcile(ctx, req)
}
// reconcileHandler invokes the reconciler and looks at its return value
// to determine whether the request should be rescheduled
func (c *controller) reconcileHandler(ctx context.Context, req Request) {
if err := c.reconcile(ctx, req); err != nil {
// handle the case where we're specifically told to requeue later
var requeueAfter RequeueAfterError
if errors.As(err, &requeueAfter) {
c.work.Forget(req)
c.work.AddAfter(req, time.Duration(requeueAfter))
return
}
// fallback to rate limit ourselves
c.work.AddRateLimited(req)
return
}
// if no error then Forget this request so it is not retried
c.work.Forget(req)
}