EventPublisher: docstrings and getTopicBuffer

also rename commitCh -> publishCh
This commit is contained in:
Daniel Nephin 2020-06-15 16:59:09 -04:00
parent 2020e9c7c7
commit f626c3d6c5
1 changed files with 25 additions and 21 deletions

View File

@ -28,7 +28,7 @@ type EventPublisher struct {
// not that expensive, but having a cache for a few seconds can help // not that expensive, but having a cache for a few seconds can help
// de-duplicate building the same snapshot over and over again when a // de-duplicate building the same snapshot over and over again when a
// thundering herd of watchers all subscribe to the same topic within a few // thundering herd of watchers all subscribe to the same topic within a few
// seconds. TODO // seconds.
snapCacheTTL time.Duration snapCacheTTL time.Duration
// This lock protects the topicBuffers, snapCache and subsByToken maps. // This lock protects the topicBuffers, snapCache and subsByToken maps.
@ -38,8 +38,8 @@ type EventPublisher struct {
// for a topic. // for a topic.
topicBuffers map[stream.Topic]*stream.EventBuffer topicBuffers map[stream.Topic]*stream.EventBuffer
// snapCache stores the head of any snapshot buffers still in cache if caching // snapCache if a cache of EventSnapshots indexed by topic and key.
// is enabled. // TODO: new struct for snapCache and snapFns and snapCacheTTL
snapCache map[stream.Topic]map[string]*stream.EventSnapshot snapCache map[stream.Topic]map[string]*stream.EventSnapshot
// snapFns is the set of snapshot functions that were registered bound to the // snapFns is the set of snapshot functions that were registered bound to the
@ -51,9 +51,10 @@ type EventPublisher struct {
// ACL permissions change. // ACL permissions change.
subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
// commitCh decouples the Commit call in the FSM hot path from distributing // publishCh is used to send messages from an active txn to a goroutine which
// the resulting events. // publishes events, so that publishing can happen asynchronously from
commitCh chan commitUpdate // the Commit call in the FSM hot path.
publishCh chan commitUpdate
} }
type commitUpdate struct { type commitUpdate struct {
@ -70,7 +71,7 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
snapFns: make(map[stream.Topic]stream.SnapFn), snapFns: make(map[stream.Topic]stream.SnapFn),
subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
commitCh: make(chan commitUpdate, 64), publishCh: make(chan commitUpdate, 64),
} }
// create a local handler table // create a local handler table
@ -98,7 +99,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
events = append(events, es...) events = append(events, es...)
} }
} }
e.commitCh <- commitUpdate{ e.publishCh <- commitUpdate{
// TODO: document why it must be created here, and not in the new thread // TODO: document why it must be created here, and not in the new thread
// //
// Create a new transaction since it's going to be used from a different // Create a new transaction since it's going to be used from a different
@ -113,7 +114,7 @@ func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
func (e *EventPublisher) handleUpdates() { func (e *EventPublisher) handleUpdates() {
for { for {
update := <-e.commitCh update := <-e.publishCh
e.sendEvents(update) e.sendEvents(update)
} }
} }
@ -158,15 +159,22 @@ func (e *EventPublisher) sendEvents(update commitUpdate) {
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
} }
// Deliver events
for topic, events := range eventsByTopic { for topic, events := range eventsByTopic {
e.getTopicBuffer(topic).Append(events)
}
}
// getTopicBuffer for the topic. Creates a new event buffer if one does not
// already exist.
//
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer {
buf, ok := e.topicBuffers[topic] buf, ok := e.topicBuffers[topic]
if !ok { if !ok {
buf = stream.NewEventBuffer() buf = stream.NewEventBuffer()
e.topicBuffers[topic] = buf e.topicBuffers[topic] = buf
} }
buf.Append(events) return buf
}
} }
// handleACLUpdate handles an ACL token/policy/role update. This method assumes // handleACLUpdate handles an ACL token/policy/role update. This method assumes
@ -250,11 +258,7 @@ func (e *EventPublisher) Subscribe(
// Ensure there is a topic buffer for that topic so we start capturing any // Ensure there is a topic buffer for that topic so we start capturing any
// future published events. // future published events.
buf, ok := e.topicBuffers[req.Topic] buf := e.getTopicBuffer(req.Topic)
if !ok {
buf = stream.NewEventBuffer()
e.topicBuffers[req.Topic] = buf
}
// See if we need a snapshot // See if we need a snapshot
topicHead := buf.Head() topicHead := buf.Head()