stream.EventSnapshot: reduce the fields on the struct

Many of the fields are only needed in one place, and by using a closure
they can be removed from the struct. This reduces the scope of the variables
making it esier to see how they are used.
This commit is contained in:
Daniel Nephin 2020-06-10 19:07:58 -04:00
parent 7196917051
commit 606121fae6
1 changed files with 33 additions and 53 deletions

View File

@ -3,30 +3,19 @@ package stream
// EventSnapshot represents the state of memdb for a given topic and key at some // EventSnapshot represents the state of memdb for a given topic and key at some
// point in time. It is modelled as a buffer of events so that snapshots can be // point in time. It is modelled as a buffer of events so that snapshots can be
// streamed to possibly multiple subscribers concurrently, and can be trivially // streamed to possibly multiple subscribers concurrently, and can be trivially
// cached by just keeping the Snapshot around. Once the EventSnapshot is dropped // cached by retaining a reference to a Snapshot. Once the reference to EventSnapshot
// from memory, any subscribers still reading from it may do so by following // is dropped from memory, any subscribers still reading from it may do so by following
// their pointers but eventually the snapshot is garbage collected automatically // their pointers. When the last subscribe unsubscribes the snapshot is garbage
// by Go's runtime, simplifying snapshot and buffer management dramatically. // collected automatically by Go's runtime. This simplifies snapshot and buffer
// management dramatically.
type EventSnapshot struct { type EventSnapshot struct {
// Request that this snapshot satisfies.
Request *SubscribeRequest
// Snap is the first item in the buffer containing the snapshot. Once the // Snap is the first item in the buffer containing the snapshot. Once the
// snapshot is complete, subsequent update's BufferItems are appended such // snapshot is complete, subsequent BufferItems are appended to snapBuffer,
// that subscribers just need to follow this buffer for the duration of their // so that subscribers receive all the events from the same buffer.
// subscription stream.
Snap *BufferItem Snap *BufferItem
// snapBuffer is the Head of the snapshot buffer the fn should write to. // snapBuffer is the Head of the snapshot buffer the fn should write to.
snapBuffer *EventBuffer snapBuffer *EventBuffer
// topicBufferHead stored the current most-recent published item from before
// the snapshot was taken such that anything published during snapshot
// publishing can be captured.
topicBufferHead *BufferItem
// SnapFn is the function that will make the snapshot for this request.
fn SnapFn
} }
// SnapFn is the type of function needed to generate a snapshot for a topic and // SnapFn is the type of function needed to generate a snapshot for a topic and
@ -43,39 +32,32 @@ type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error)
func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot { func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot {
buf := NewEventBuffer() buf := NewEventBuffer()
s := &EventSnapshot{ s := &EventSnapshot{
Request: req,
Snap: buf.Head(), Snap: buf.Head(),
snapBuffer: buf, snapBuffer: buf,
topicBufferHead: topicBufferHead,
fn: fn,
} }
go s.doSnapshot()
return s
}
func (s *EventSnapshot) doSnapshot() { go func() {
// Call snapshot func idx, err := fn(req, s.snapBuffer)
idx, err := s.fn(s.Request, s.snapBuffer)
if err != nil { if err != nil {
// Append an error result to signal to subscribers that this snapshot is no
// good.
s.snapBuffer.AppendErr(err) s.snapBuffer.AppendErr(err)
return return
} }
// We wrote the snapshot events to the buffer, send the "end of snapshot" event // We wrote the snapshot events to the buffer, send the "end of snapshot" event
s.snapBuffer.Append([]Event{{ s.snapBuffer.Append([]Event{{
Topic: s.Request.Topic, Topic: req.Topic,
Key: s.Request.Key, Key: req.Key,
Index: idx, Index: idx,
Payload: &agentpb.Event_EndOfSnapshot{ Payload: endOfSnapshot{},
EndOfSnapshot: true,
},
}}) }})
s.spliceFromTopicBuffer(topicBufferHead, idx)
}()
return s
}
func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx uint64) {
// Now splice on the topic buffer. We need to iterate through the buffer to // Now splice on the topic buffer. We need to iterate through the buffer to
// find the first event after the current snapshot. // find the first event after the current snapshot.
item := s.topicBufferHead item := topicBufferHead
for { for {
// Find the next item that we should include. // Find the next item that we should include.
next, err := item.NextNoBlock() next, err := item.NextNoBlock()
@ -108,15 +90,13 @@ func (s *EventSnapshot) doSnapshot() {
return return
} }
if len(next.Events) > 0 { if len(next.Events) > 0 && next.Events[0].Index > idx {
if next.Events[0].Index > idx {
// We've found an update in the topic buffer that happened after our // We've found an update in the topic buffer that happened after our
// snapshot was taken, splice it into the snapshot buffer so subscribers // snapshot was taken, splice it into the snapshot buffer so subscribers
// can continue to read this and others after it. // can continue to read this and others after it.
s.snapBuffer.AppendBuffer(next) s.snapBuffer.AppendBuffer(next)
return return
} }
}
// We don't need this item, continue to next // We don't need this item, continue to next
item = next item = next
} }