297 lines
9.0 KiB
Go
297 lines
9.0 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// EventPublisher receives changes events from Publish, and sends them to all
|
|
// registered subscribers.
|
|
type EventPublisher struct {
|
|
// snapCacheTTL controls how long we keep snapshots in our cache before
|
|
// allowing them to be garbage collected and a new one made for subsequent
|
|
// requests for that topic and key. In general this should be pretty short to
|
|
// keep memory overhead of duplicated event data low - snapshots are typically
|
|
// not that expensive, but having a cache for a few seconds can help
|
|
// de-duplicate building the same snapshot over and over again when a
|
|
// thundering herd of watchers all subscribe to the same topic within a few
|
|
// seconds.
|
|
snapCacheTTL time.Duration
|
|
|
|
// This lock protects the topicBuffers, and snapCache
|
|
lock sync.RWMutex
|
|
|
|
// topicBuffers stores the head of the linked-list buffer to publish events to
|
|
// for a topic.
|
|
topicBuffers map[Topic]*eventBuffer
|
|
|
|
// snapCache if a cache of EventSnapshots indexed by topic and key.
|
|
// TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
|
|
snapCache map[Topic]map[string]*eventSnapshot
|
|
|
|
subscriptions *subscriptions
|
|
|
|
// publishCh is used to send messages from an active txn to a goroutine which
|
|
// publishes events, so that publishing can happen asynchronously from
|
|
// the Commit call in the FSM hot path.
|
|
publishCh chan changeEvents
|
|
|
|
snapshotHandlers SnapshotHandlers
|
|
}
|
|
|
|
type subscriptions struct {
|
|
// lock for byToken. If both subscription.lock and EventPublisher.lock need
|
|
// to be held, EventPublisher.lock MUST always be acquired first.
|
|
lock sync.RWMutex
|
|
|
|
// byToken is an mapping of active Subscriptions indexed by a the token and
|
|
// a pointer to the request.
|
|
// When the token is modified all subscriptions under that token will be
|
|
// reloaded.
|
|
// A subscription may be unsubscribed by using the pointer to the request.
|
|
byToken map[string]map[*SubscribeRequest]*Subscription
|
|
}
|
|
|
|
type changeEvents struct {
|
|
events []Event
|
|
}
|
|
|
|
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
|
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
|
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
|
|
|
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
|
type SnapshotAppender interface {
|
|
// Append events to the snapshot. Every event in the slice must have the same
|
|
// Index, indicating that it is part of the same raft transaction.
|
|
Append(events []Event)
|
|
}
|
|
|
|
// NewEventPublisher returns an EventPublisher for publishing change events.
|
|
// Handlers are used to convert the memDB changes into events.
|
|
// A goroutine is run in the background to publish events to all subscribes.
|
|
// Cancelling the context will shutdown the goroutine, to free resources,
|
|
// and stop all publishing.
|
|
func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
|
|
e := &EventPublisher{
|
|
snapCacheTTL: snapCacheTTL,
|
|
topicBuffers: make(map[Topic]*eventBuffer),
|
|
snapCache: make(map[Topic]map[string]*eventSnapshot),
|
|
publishCh: make(chan changeEvents, 64),
|
|
subscriptions: &subscriptions{
|
|
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
|
},
|
|
snapshotHandlers: handlers,
|
|
}
|
|
|
|
go e.handleUpdates(ctx)
|
|
|
|
return e
|
|
}
|
|
|
|
// PublishEvents to all subscribers. tx is a read-only transaction that captures
|
|
// the state at the time the change happened. The caller must never use the tx once
|
|
// it has been passed to PublishChanged.
|
|
func (e *EventPublisher) PublishEvents(events []Event) {
|
|
if len(events) > 0 {
|
|
e.publishCh <- changeEvents{events: events}
|
|
}
|
|
}
|
|
|
|
func (e *EventPublisher) handleUpdates(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// TODO: also close all subscriptions so the subscribers are moved
|
|
// to the new publisher?
|
|
return
|
|
case update := <-e.publishCh:
|
|
e.sendEvents(update)
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendEvents sends the given events to any applicable topic listeners, as well
|
|
// as any ACL update events to cause affected listeners to reset their stream.
|
|
func (e *EventPublisher) sendEvents(update changeEvents) {
|
|
eventsByTopic := make(map[Topic][]Event)
|
|
for _, event := range update.events {
|
|
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
|
|
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
|
|
continue
|
|
}
|
|
|
|
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
|
|
}
|
|
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
for topic, events := range eventsByTopic {
|
|
e.getTopicBuffer(topic).Append(events)
|
|
}
|
|
}
|
|
|
|
// getTopicBuffer for the topic. Creates a new event buffer if one does not
|
|
// already exist.
|
|
//
|
|
// EventPublisher.lock must be held to call this method.
|
|
func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
|
|
buf, ok := e.topicBuffers[topic]
|
|
if !ok {
|
|
buf = newEventBuffer()
|
|
e.topicBuffers[topic] = buf
|
|
}
|
|
return buf
|
|
}
|
|
|
|
// Subscribe returns a new stream.Subscription for the given request. A
|
|
// subscription will stream an initial snapshot of events matching the request
|
|
// if required and then block until new events that modify the request occur, or
|
|
// the context is cancelled. Subscriptions may be forced to reset if the server
|
|
// decides it can no longer maintain correct operation for example if ACL
|
|
// policies changed or the state store was restored.
|
|
//
|
|
// When the caller is finished with the subscription for any reason, it must
|
|
// call Subscription.Unsubscribe to free ACL tracking resources.
|
|
func (e *EventPublisher) Subscribe(
|
|
ctx context.Context,
|
|
req *SubscribeRequest,
|
|
) (*Subscription, error) {
|
|
// Ensure we know how to make a snapshot for this topic
|
|
_, ok := e.snapshotHandlers[req.Topic]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
|
}
|
|
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
|
|
// Ensure there is a topic buffer for that topic so we start capturing any
|
|
// future published events.
|
|
buf := e.getTopicBuffer(req.Topic)
|
|
|
|
// See if we need a snapshot
|
|
topicHead := buf.Head()
|
|
var sub *Subscription
|
|
if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index {
|
|
// No need for a snapshot, send the "resume stream" message to signal to
|
|
// client its cache is still good, then follow along from here in the topic.
|
|
e := Event{
|
|
Index: req.Index,
|
|
Topic: req.Topic,
|
|
Key: req.Key,
|
|
Payload: endOfEmptySnapshot{},
|
|
}
|
|
// Make a new buffer to send to the client containing the resume.
|
|
buf := newEventBuffer()
|
|
|
|
// Store the head of that buffer before we append to it to give as the
|
|
// starting point for the subscription.
|
|
subHead := buf.Head()
|
|
|
|
buf.Append([]Event{e})
|
|
|
|
// Now splice the rest of the topic buffer on so the subscription will
|
|
// continue to see future updates in the topic buffer.
|
|
follow, err := topicHead.FollowAfter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
buf.AppendBuffer(follow)
|
|
|
|
sub = newSubscription(ctx, req, subHead)
|
|
} else {
|
|
snap, err := e.getSnapshotLocked(req, topicHead)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sub = newSubscription(ctx, req, snap.Snap)
|
|
}
|
|
|
|
e.subscriptions.add(req, sub)
|
|
// Set unsubscribe so that the caller doesn't need to keep track of the
|
|
// SubscriptionRequest, and can not accidentally call unsubscribe with the
|
|
// wrong value.
|
|
sub.Unsubscribe = func() {
|
|
e.subscriptions.unsubscribe(req)
|
|
}
|
|
return sub, nil
|
|
}
|
|
|
|
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
|
if !ok {
|
|
subsByToken = make(map[*SubscribeRequest]*Subscription)
|
|
s.byToken[req.Token] = subsByToken
|
|
}
|
|
subsByToken[req] = sub
|
|
}
|
|
|
|
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
|
s.lock.RLock()
|
|
defer s.lock.RUnlock()
|
|
|
|
for _, secretID := range tokenSecretIDs {
|
|
if subs, ok := s.byToken[secretID]; ok {
|
|
for _, sub := range subs {
|
|
sub.Close()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// unsubscribe must be called when a client is no longer interested in a
|
|
// subscription to free resources monitoring changes in it's ACL token.
|
|
//
|
|
// req MUST be the same pointer that was used to register the subscription.
|
|
func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
|
if !ok {
|
|
return
|
|
}
|
|
delete(subsByToken, req)
|
|
if len(subsByToken) == 0 {
|
|
delete(s.byToken, req.Token)
|
|
}
|
|
}
|
|
|
|
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
|
|
topicSnaps, ok := e.snapCache[req.Topic]
|
|
if !ok {
|
|
topicSnaps = make(map[string]*eventSnapshot)
|
|
e.snapCache[req.Topic] = topicSnaps
|
|
}
|
|
|
|
snap, ok := topicSnaps[req.Key]
|
|
if ok && snap.err() == nil {
|
|
return snap, nil
|
|
}
|
|
|
|
handler, ok := e.snapshotHandlers[req.Topic]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
|
}
|
|
|
|
snap = newEventSnapshot(req, topicHead, handler)
|
|
if e.snapCacheTTL > 0 {
|
|
topicSnaps[req.Key] = snap
|
|
|
|
// Trigger a clearout after TTL
|
|
time.AfterFunc(e.snapCacheTTL, func() {
|
|
e.lock.Lock()
|
|
defer e.lock.Unlock()
|
|
delete(topicSnaps, req.Key)
|
|
})
|
|
}
|
|
|
|
return snap, nil
|
|
}
|