stream: have SnapshotFunc accept a non-pointer SubscribeRequest

The value is not expected to be modified. Passing a value makes that explicit.
This commit is contained in:
Daniel Nephin 2020-07-14 19:23:44 -04:00
parent 3c77f4c7d2
commit 81cc3daf69
5 changed files with 12 additions and 10 deletions

View File

@ -376,7 +376,7 @@ var topicService stream.Topic = topic("test-topic-service")
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{ return stream.SnapshotHandlers{
topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) {
idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) idx, nodes, err := s.ServiceNodes(nil, req.Key, nil)
if err != nil { if err != nil {
return idx, err return idx, err

View File

@ -61,7 +61,11 @@ type changeEvents struct {
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot // SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. // of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// The nil Topic is reserved and should not be used. // The nil Topic is reserved and should not be used.
type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) type SnapshotHandlers map[Topic]SnapshotFunc
// SnapshotFunc builds a snapshot for the subscription request, and appends the
// events to the Snapshot using SnapshotAppender.
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
// SnapshotAppender appends groups of events to create a Snapshot of state. // SnapshotAppender appends groups of events to create a Snapshot of state.
type SnapshotAppender interface { type SnapshotAppender interface {

View File

@ -58,7 +58,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
func newTestSnapshotHandlers() SnapshotHandlers { func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{ return SnapshotHandlers{
testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
if req.Topic != testTopic { if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic) return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
} }
@ -117,7 +117,7 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
t.Cleanup(cancel) t.Cleanup(cancel)
handlers := newTestSnapshotHandlers() handlers := newTestSnapshotHandlers()
fn := func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { fn := func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
return 0, nil return 0, nil
} }
handlers[intTopic(22)] = fn handlers[intTopic(22)] = fn

View File

@ -18,8 +18,6 @@ type eventSnapshot struct {
snapBuffer *eventBuffer snapBuffer *eventBuffer
} }
type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
// newEventSnapshot creates a snapshot buffer based on the subscription request. // newEventSnapshot creates a snapshot buffer based on the subscription request.
// The current buffer head for the topic requested is passed so that once the // The current buffer head for the topic requested is passed so that once the
// snapshot is complete and has been delivered into the buffer, any events // snapshot is complete and has been delivered into the buffer, any events
@ -27,7 +25,7 @@ type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error)
// missed. Once the snapshot is delivered the topic buffer is spliced onto the // missed. Once the snapshot is delivered the topic buffer is spliced onto the
// snapshot buffer so that subscribers will naturally follow from the snapshot // snapshot buffer so that subscribers will naturally follow from the snapshot
// to wait for any subsequent updates. // to wait for any subsequent updates.
func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot { func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot {
buf := newEventBuffer() buf := newEventBuffer()
s := &eventSnapshot{ s := &eventSnapshot{
Head: buf.Head(), Head: buf.Head(),
@ -35,7 +33,7 @@ func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn sna
} }
go func() { go func() {
idx, err := fn(req, s.snapBuffer) idx, err := fn(*req, s.snapBuffer)
if err != nil { if err != nil {
s.snapBuffer.AppendItem(&bufferItem{Err: err}) s.snapBuffer.AppendItem(&bufferItem{Err: err})
return return

View File

@ -161,8 +161,8 @@ func genSequentialIDs(start, end int) []string {
return ids return ids
} }
func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc { func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapshotFunc {
return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { return func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
// Event content is arbitrary we are just using Health because it's the // Event content is arbitrary we are just using Health because it's the
// first type defined. We just want a set of things with consecutive // first type defined. We just want a set of things with consecutive