submatview: remove method receiver from handlers
This commit is contained in:
parent
f857aef4a8
commit
e5813bd9d6
|
@ -10,37 +10,45 @@ import (
|
|||
// If eventHandler fails to handle the events it may return an error. If an
|
||||
// error is returned the next eventHandler will be ignored.
|
||||
// eventHandler is used to implement a very simple finite-state machine.
|
||||
type eventHandler func(events *pbsubscribe.Event) (next eventHandler, err error)
|
||||
type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error)
|
||||
|
||||
func (m *Materializer) initialHandler(index uint64) eventHandler {
|
||||
if index == 0 {
|
||||
return newSnapshotHandler(m)
|
||||
}
|
||||
return m.resumeStreamHandler
|
||||
type viewState interface {
|
||||
updateView(events []*pbsubscribe.Event, index uint64) error
|
||||
reset()
|
||||
}
|
||||
|
||||
func initialHandler(index uint64) eventHandler {
|
||||
if index == 0 {
|
||||
return newSnapshotHandler()
|
||||
}
|
||||
return resumeStreamHandler
|
||||
}
|
||||
|
||||
// snapshotHandler accumulates events. When it receives an EndOfSnapshot event
|
||||
// it updates the view, and then returns eventStreamHandler to handle new events.
|
||||
type snapshotHandler struct {
|
||||
material *Materializer
|
||||
events []*pbsubscribe.Event
|
||||
}
|
||||
|
||||
func newSnapshotHandler(m *Materializer) eventHandler {
|
||||
return (&snapshotHandler{material: m}).handle
|
||||
func newSnapshotHandler() eventHandler {
|
||||
return (&snapshotHandler{}).handle
|
||||
}
|
||||
|
||||
func (h *snapshotHandler) handle(event *pbsubscribe.Event) (eventHandler, error) {
|
||||
func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
|
||||
if event.GetEndOfSnapshot() {
|
||||
err := h.material.updateView(h.events, event.Index)
|
||||
return h.material.eventStreamHandler, err
|
||||
err := state.updateView(h.events, event.Index)
|
||||
return eventStreamHandler, err
|
||||
}
|
||||
|
||||
h.events = append(h.events, eventsFromEvent(event)...)
|
||||
return h.handle, nil
|
||||
}
|
||||
|
||||
func (m *Materializer) eventStreamHandler(event *pbsubscribe.Event) (eventHandler, error) {
|
||||
err := m.updateView(eventsFromEvent(event), event.Index)
|
||||
return m.eventStreamHandler, err
|
||||
// eventStreamHandler handles events by updating the view. It always returns
|
||||
// itself as the next handler.
|
||||
func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
|
||||
err := state.updateView(eventsFromEvent(event), event.Index)
|
||||
return eventStreamHandler, err
|
||||
}
|
||||
|
||||
func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
|
||||
|
@ -50,10 +58,13 @@ func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
|
|||
return []*pbsubscribe.Event{event}
|
||||
}
|
||||
|
||||
func (m *Materializer) resumeStreamHandler(event *pbsubscribe.Event) (eventHandler, error) {
|
||||
// resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it
|
||||
// is it resets the view and returns a snapshotHandler to handle the next event.
|
||||
// Otherwise it uses eventStreamHandler to handle events.
|
||||
func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
|
||||
if event.GetNewSnapshotToFollow() {
|
||||
m.reset()
|
||||
return newSnapshotHandler(m), nil
|
||||
state.reset()
|
||||
return newSnapshotHandler(), nil
|
||||
}
|
||||
return m.eventStreamHandler(event)
|
||||
return eventStreamHandler(state, event)
|
||||
}
|
||||
|
|
|
@ -129,7 +129,7 @@ func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.Subs
|
|||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
m.handler = m.initialHandler(req.Index)
|
||||
m.handler = initialHandler(req.Index)
|
||||
|
||||
s, err := m.deps.Client.Subscribe(ctx, &req)
|
||||
if err != nil {
|
||||
|
@ -146,7 +146,7 @@ func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.Subs
|
|||
return err
|
||||
}
|
||||
|
||||
m.handler, err = m.handler(event)
|
||||
m.handler, err = m.handler(m, event)
|
||||
if err != nil {
|
||||
m.reset()
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue