a2f8605c66
Move the subscription context to Next. context.Context should generally never be stored in a struct because it makes that struct only valid while the context is valid. This is rarely obvious from the caller. Adds a forceClosed channel in place of the old context, and uses the new context as a way for the caller to stop the Subscription blocking. Remove some recursion out of bufferImte.Next. The caller is already looping so we can continue in that loop instead of recursing. This ensures currentItem is updated immediately (which probably does not matter in practice), and also removes the chance that we overflow the stack. NextNoBlock and FollowAfter do not need to handle bufferItem.Err, the caller already handles it. Moves filter to a method to simplify Next, and more explicitly separate filtering from looping. Also improve some godoc Only unwrap itemBuffer.Err when necessary
283 lines
8.7 KiB
Go
283 lines
8.7 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// EventPublisher receives change events from Publish, and sends the events to
|
|
// all subscribers of the event Topic.
|
|
type EventPublisher struct {
|
|
// snapCacheTTL controls how long we keep snapshots in our cache before
|
|
// allowing them to be garbage collected and a new one made for subsequent
|
|
// requests for that topic and key. In general this should be pretty short to
|
|
// keep memory overhead of duplicated event data low - snapshots are typically
|
|
// not that expensive, but having a cache for a few seconds can help
|
|
// de-duplicate building the same snapshot over and over again when a
|
|
// thundering herd of watchers all subscribe to the same topic within a few
|
|
// seconds.
|
|
snapCacheTTL time.Duration
|
|
|
|
// This lock protects the topicBuffers, and snapCache
|
|
lock sync.RWMutex
|
|
|
|
// topicBuffers stores the head of the linked-list buffer to publish events to
|
|
// for a topic.
|
|
topicBuffers map[Topic]*eventBuffer
|
|
|
|
// snapCache if a cache of EventSnapshots indexed by topic and key.
|
|
// TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
|
|
snapCache map[Topic]map[string]*eventSnapshot
|
|
|
|
subscriptions *subscriptions
|
|
|
|
// publishCh is used to send messages from an active txn to a goroutine which
|
|
// publishes events, so that publishing can happen asynchronously from
|
|
// the Commit call in the FSM hot path.
|
|
publishCh chan changeEvents
|
|
|
|
snapshotHandlers SnapshotHandlers
|
|
}
|
|
|
|
type subscriptions struct {
|
|
// lock for byToken. If both subscription.lock and EventPublisher.lock need
|
|
// to be held, EventPublisher.lock MUST always be acquired first.
|
|
lock sync.RWMutex
|
|
|
|
// byToken is an mapping of active Subscriptions indexed by a the token and
|
|
// a pointer to the request.
|
|
// When the token is modified all subscriptions under that token will be
|
|
// reloaded.
|
|
// A subscription may be unsubscribed by using the pointer to the request.
|
|
byToken map[string]map[*SubscribeRequest]*Subscription
|
|
}
|
|
|
|
type changeEvents struct {
|
|
events []Event
|
|
}
|
|
|
|
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
|
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
|
// The nil Topic is reserved and should not be used.
|
|
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
|
|
|
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
|
type SnapshotAppender interface {
|
|
// Append events to the snapshot. Every event in the slice must have the same
|
|
// Index, indicating that it is part of the same raft transaction.
|
|
Append(events []Event)
|
|
}
|
|
|
|
// NewEventPublisher returns an EventPublisher for publishing change events.
|
|
// Handlers are used to convert the memDB changes into events.
|
|
// A goroutine is run in the background to publish events to all subscribes.
|
|
// Cancelling the context will shutdown the goroutine, to free resources,
|
|
// and stop all publishing.
|
|
func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
|
|
e := &EventPublisher{
|
|
snapCacheTTL: snapCacheTTL,
|
|
topicBuffers: make(map[Topic]*eventBuffer),
|
|
snapCache: make(map[Topic]map[string]*eventSnapshot),
|
|
publishCh: make(chan changeEvents, 64),
|
|
subscriptions: &subscriptions{
|
|
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
|
},
|
|
snapshotHandlers: handlers,
|
|
}
|
|
|
|
go e.handleUpdates(ctx)
|
|
|
|
return e
|
|
}
|
|
|
|
// Publish events to all subscribers of the event Topic.
|
|
func (e *EventPublisher) Publish(events []Event) {
|
|
if len(events) > 0 {
|
|
e.publishCh <- changeEvents{events: events}
|
|
}
|
|
}
|
|
|
|
func (e *EventPublisher) handleUpdates(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// TODO: also close all subscriptions so the subscribers are moved
|
|
// to the new publisher?
|
|
return
|
|
case update := <-e.publishCh:
|
|
e.sendEvents(update)
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendEvents sends the given events to any applicable topic listeners, as well
|
|
// as any ACL update events to cause affected listeners to reset their stream.
|
|
func (e *EventPublisher) sendEvents(update changeEvents) {
|
|
eventsByTopic := make(map[Topic][]Event)
|
|
for _, event := range update.events {
|
|
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
|
|
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
|
|
continue
|
|
}
|
|
|
|
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
|
|
}
|
|
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
for topic, events := range eventsByTopic {
|
|
e.getTopicBuffer(topic).Append(events)
|
|
}
|
|
}
|
|
|
|
// getTopicBuffer for the topic. Creates a new event buffer if one does not
|
|
// already exist.
|
|
//
|
|
// EventPublisher.lock must be held to call this method.
|
|
func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
|
|
buf, ok := e.topicBuffers[topic]
|
|
if !ok {
|
|
buf = newEventBuffer()
|
|
e.topicBuffers[topic] = buf
|
|
}
|
|
return buf
|
|
}
|
|
|
|
// Subscribe returns a new Subscription for the given request. A subscription
|
|
// will receive an initial snapshot of events matching the request if req.Index > 0.
|
|
// After the snapshot, events will be streamed as they are created.
|
|
// Subscriptions may be closed, forcing the client to resubscribe (for example if
|
|
// ACL policies changed or the state store is abandoned).
|
|
//
|
|
// When the caller is finished with the subscription for any reason, it must
|
|
// call Subscription.Unsubscribe to free ACL tracking resources.
|
|
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
|
|
// Ensure we know how to make a snapshot for this topic
|
|
_, ok := e.snapshotHandlers[req.Topic]
|
|
if !ok || req.Topic == nil {
|
|
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
|
}
|
|
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
|
|
// Ensure there is a topic buffer for that topic so we start capturing any
|
|
// future published events.
|
|
buf := e.getTopicBuffer(req.Topic)
|
|
|
|
// See if we need a snapshot
|
|
topicHead := buf.Head()
|
|
var sub *Subscription
|
|
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
|
|
// No need for a snapshot, send the "end of empty snapshot" message to signal to
|
|
// client its cache is still good, then follow along from here in the topic.
|
|
buf := newEventBuffer()
|
|
|
|
// Store the head of that buffer before we append to it to give as the
|
|
// starting point for the subscription.
|
|
subHead := buf.Head()
|
|
|
|
buf.Append([]Event{{
|
|
Index: req.Index,
|
|
Topic: req.Topic,
|
|
Key: req.Key,
|
|
Payload: endOfEmptySnapshot{},
|
|
}})
|
|
|
|
// Now splice the rest of the topic buffer on so the subscription will
|
|
// continue to see future updates in the topic buffer.
|
|
buf.AppendItem(topicHead.NextLink())
|
|
|
|
sub = newSubscription(req, subHead, e.subscriptions.unsubscribe(req))
|
|
} else {
|
|
snap, err := e.getSnapshotLocked(req, topicHead)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sub = newSubscription(req, snap.Head, e.subscriptions.unsubscribe(req))
|
|
}
|
|
|
|
e.subscriptions.add(req, sub)
|
|
return sub, nil
|
|
}
|
|
|
|
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
|
if !ok {
|
|
subsByToken = make(map[*SubscribeRequest]*Subscription)
|
|
s.byToken[req.Token] = subsByToken
|
|
}
|
|
subsByToken[req] = sub
|
|
}
|
|
|
|
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
for _, secretID := range tokenSecretIDs {
|
|
if subs, ok := s.byToken[secretID]; ok {
|
|
for _, sub := range subs {
|
|
sub.forceClose()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// unsubscribe returns a function that the subscription will call to remove
|
|
// itself from the subsByToken.
|
|
// This function is returned as a closure so that the caller doesn't need to keep
|
|
// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the
|
|
// wrong pointer.
|
|
func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
|
|
return func() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
|
if !ok {
|
|
return
|
|
}
|
|
delete(subsByToken, req)
|
|
if len(subsByToken) == 0 {
|
|
delete(s.byToken, req.Token)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
|
|
topicSnaps, ok := e.snapCache[req.Topic]
|
|
if !ok {
|
|
topicSnaps = make(map[string]*eventSnapshot)
|
|
e.snapCache[req.Topic] = topicSnaps
|
|
}
|
|
|
|
snap, ok := topicSnaps[req.Key]
|
|
if ok && snap.err() == nil {
|
|
return snap, nil
|
|
}
|
|
|
|
handler, ok := e.snapshotHandlers[req.Topic]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
|
}
|
|
|
|
snap = newEventSnapshot(req, topicHead, handler)
|
|
if e.snapCacheTTL > 0 {
|
|
topicSnaps[req.Key] = snap
|
|
|
|
// Trigger a clearout after TTL
|
|
time.AfterFunc(e.snapCacheTTL, func() {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
delete(topicSnaps, req.Key)
|
|
})
|
|
}
|
|
|
|
return snap, nil
|
|
}
|