open-nomad/nomad/stream/event_publisher.go

229 lines
5.8 KiB
Go
Raw Normal View History

package stream
import (
"context"
"fmt"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/go-hclog"
)
const (
DefaultTTL = 1 * time.Hour
)
type EventBrokerCfg struct {
EventBufferSize int64
Logger hclog.Logger
OnEvict EvictCallbackFn
}
type EventBroker struct {
// mu protects the eventbuffer and subscriptions
mu sync.Mutex
// eventBuf stores a configurable amount of events in memory
eventBuf *eventBuffer
subscriptions *subscriptions
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan *structs.Events
logger hclog.Logger
}
// NewEventBroker returns an EventBroker for publishing change events.
// A goroutine is run in the background to publish events to an event buffer.
// Cancelling the context will shutdown the goroutine to free resources, and stop
// all publishing.
func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker {
if cfg.Logger == nil {
cfg.Logger = hclog.NewNullLogger()
}
// Set the event buffer size to a minimum
if cfg.EventBufferSize == 0 {
cfg.EventBufferSize = 100
}
buffer := newEventBuffer(cfg.EventBufferSize, cfg.OnEvict)
e := &EventBroker{
logger: cfg.Logger.Named("event_broker"),
eventBuf: buffer,
publishCh: make(chan *structs.Events, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
}
go e.handleUpdates(ctx)
return e
}
// Returns the current length of the event buffer
func (e *EventBroker) Len() int {
e.mu.Lock()
defer e.mu.Unlock()
return e.eventBuf.Len()
}
// Publish events to all subscribers of the event Topic.
func (e *EventBroker) Publish(events *structs.Events) {
if len(events.Events) > 0 {
e.publishCh <- events
}
}
// Subscribe returns a new Subscription for a given request. A Subscription
// will receive an initial empty currentItem value which points to the first item
// in the buffer. This allows the new subscription to call Next() without first checking
// for the current Item.
//
// A Subscription will start at the requested index, or as close as possible to
// the requested index if it is no longer in the buffer. If StartExactlyAtIndex is
// set and the index is no longer in the buffer or not yet in the buffer an error
// will be returned.
//
// When a caller is finished with the subscription it must call Subscription.Unsubscribe
// to free ACL tracking resources. TODO(drew) ACL tracking
func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) {
e.mu.Lock()
defer e.mu.Unlock()
var head *bufferItem
var offset int
if req.Index != 0 {
head, offset = e.eventBuf.StartAtClosest(req.Index)
} else {
head = e.eventBuf.Head()
}
if offset > 0 && req.StartExactlyAtIndex {
return nil, fmt.Errorf("requested index not in buffer")
} else if offset > 0 {
metrics.SetGauge([]string{"nomad", "event_broker", "subscription", "request_offset"}, float32(offset))
e.logger.Debug("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index))
}
// Empty head so that calling Next on sub
start := newBufferItem(&structs.Events{Index: req.Index})
start.link.next.Store(head)
close(start.link.ch)
sub := newSubscription(req, start, e.subscriptions.unsubscribeFn(req))
e.subscriptions.add(req, sub)
return sub, nil
}
// CloseAll closes all subscriptions
func (e *EventBroker) CloseAll() {
e.subscriptions.closeAll()
}
func (e *EventBroker) handleUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
e.subscriptions.closeAll()
return
case update := <-e.publishCh:
e.sendEvents(update)
}
}
}
// sendEvents sends the given events to the publishers event buffer.
func (e *EventBroker) sendEvents(update *structs.Events) {
e.mu.Lock()
defer e.mu.Unlock()
e.eventBuf.Append(update)
}
type subscriptions struct {
// mu for byToken. If both subscription.mu and EventBroker.mu need
// to be held, EventBroker mutex MUST always be acquired first.
mu sync.RWMutex
// byToken is an mapping of active Subscriptions indexed by a token and
// a pointer to the request.
// When the token is modified all subscriptions under that token will be
// reloaded.
// A subscription may be unsubscribed by using the pointer to the request.
byToken map[string]map[*SubscribeRequest]*Subscription
}
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.mu.Lock()
defer s.mu.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
subsByToken = make(map[*SubscribeRequest]*Subscription)
s.byToken[req.Token] = subsByToken
}
subsByToken[req] = sub
}
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, secretID := range tokenSecretIDs {
if subs, ok := s.byToken[secretID]; ok {
for _, sub := range subs {
sub.forceClose()
}
}
}
}
// unsubscribeFn returns a function that the subscription will call to remove
// itself from the subsByToken.
// This function is returned as a closure so that the caller doesn't need to keep
// track of the SubscriptionRequest, and can not accidentally call unsubscribeFn with the
// wrong pointer.
func (s *subscriptions) unsubscribeFn(req *SubscribeRequest) func() {
return func() {
s.mu.Lock()
defer s.mu.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
sub := subsByToken[req]
if sub == nil {
return
}
// close the subscription
sub.forceClose()
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
}
}
}
func (s *subscriptions) closeAll() {
s.mu.Lock()
defer s.mu.Unlock()
for _, byRequest := range s.byToken {
for _, sub := range byRequest {
sub.forceClose()
}
}
}