stream: move event filtering to PayloadEvents
Removes the weirdness around PayloadEvents.FilterByKey
This commit is contained in:
parent
dcacfd3548
commit
8a26bca020
|
@ -24,62 +24,55 @@ type Payload interface {
|
|||
// Generally this means that the payload matches the key and namespace or
|
||||
// the payload is a special framing event that should be returned to every
|
||||
// subscription.
|
||||
// TODO: rename to MatchesKey
|
||||
FilterByKey(key, namespace string) bool
|
||||
}
|
||||
|
||||
// Len returns the number of events contained within this event. If the Payload
|
||||
// is a []Event, the length of that slice is returned. Otherwise 1 is returned.
|
||||
func (e Event) Len() int {
|
||||
if batch, ok := e.Payload.(PayloadEvents); ok {
|
||||
return len(batch)
|
||||
}
|
||||
return 1
|
||||
// PayloadEvents is an Payload which contains multiple Events.
|
||||
type PayloadEvents struct {
|
||||
Items []Event
|
||||
}
|
||||
|
||||
// Filter returns an Event filtered to only those Events where f returns true.
|
||||
// If the second return value is false, every Event was removed by the filter.
|
||||
func (e Event) Filter(f func(Event) bool) (Event, bool) {
|
||||
batch, ok := e.Payload.(PayloadEvents)
|
||||
if !ok {
|
||||
return e, f(e)
|
||||
}
|
||||
func NewPayloadEvents(items ...Event) *PayloadEvents {
|
||||
return &PayloadEvents{Items: items}
|
||||
}
|
||||
|
||||
func (p *PayloadEvents) filter(f func(Event) bool) bool {
|
||||
items := p.Items
|
||||
|
||||
// To avoid extra allocations, iterate over the list of events first and
|
||||
// get a count of the total desired size. This trades off some extra cpu
|
||||
// time in the worse case (when not all items match the filter), for
|
||||
// fewer memory allocations.
|
||||
var size int
|
||||
for idx := range batch {
|
||||
if f(batch[idx]) {
|
||||
for idx := range items {
|
||||
if f(items[idx]) {
|
||||
size++
|
||||
}
|
||||
}
|
||||
if len(batch) == size || size == 0 {
|
||||
return e, size != 0
|
||||
if len(items) == size || size == 0 {
|
||||
return size != 0
|
||||
}
|
||||
|
||||
filtered := make(PayloadEvents, 0, size)
|
||||
for idx := range batch {
|
||||
event := batch[idx]
|
||||
filtered := make([]Event, 0, size)
|
||||
for idx := range items {
|
||||
event := items[idx]
|
||||
if f(event) {
|
||||
filtered = append(filtered, event)
|
||||
}
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
return e, false
|
||||
}
|
||||
e.Payload = filtered
|
||||
return e, true
|
||||
p.Items = filtered
|
||||
return true
|
||||
}
|
||||
|
||||
// PayloadEvents is an Payload which contains multiple Events.
|
||||
type PayloadEvents []Event
|
||||
func (p *PayloadEvents) FilterByKey(key, namespace string) bool {
|
||||
return p.filter(func(event Event) bool {
|
||||
return event.Payload.FilterByKey(key, namespace)
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: this method is not called, but needs to exist so that we can store
|
||||
// a slice of events as a payload. In the future we should be able to refactor
|
||||
// Event.Filter so that this FilterByKey includes the re-slicing.
|
||||
func (e PayloadEvents) FilterByKey(_, _ string) bool {
|
||||
return true
|
||||
func (p *PayloadEvents) Len() int {
|
||||
return len(p.Items)
|
||||
}
|
||||
|
||||
// IsEndOfSnapshot returns true if this is a framing event that indicates the
|
||||
|
|
|
@ -15,3 +15,134 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) {
|
|||
require.False(t, e.IsEndOfSnapshot())
|
||||
})
|
||||
}
|
||||
|
||||
func newSimpleEvent(key string, index uint64) Event {
|
||||
return Event{Index: index, Payload: simplePayload{key: key}}
|
||||
}
|
||||
|
||||
func TestPayloadEvents_FilterByKey(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
req SubscribeRequest
|
||||
events []Event
|
||||
expectEvent bool
|
||||
expected *PayloadEvents
|
||||
expectedCap int
|
||||
}
|
||||
|
||||
fn := func(t *testing.T, tc testCase) {
|
||||
events := make([]Event, 0, 5)
|
||||
events = append(events, tc.events...)
|
||||
|
||||
pe := &PayloadEvents{Items: events}
|
||||
ok := pe.FilterByKey(tc.req.Key, tc.req.Namespace)
|
||||
require.Equal(t, tc.expectEvent, ok)
|
||||
if !tc.expectEvent {
|
||||
return
|
||||
}
|
||||
|
||||
require.Equal(t, tc.expected, pe)
|
||||
// test if there was a new array allocated or not
|
||||
require.Equal(t, tc.expectedCap, cap(pe.Items))
|
||||
}
|
||||
|
||||
var testCases = []testCase{
|
||||
{
|
||||
name: "all events match, no key or namespace",
|
||||
req: SubscribeRequest{Topic: testTopic},
|
||||
events: []Event{
|
||||
newSimpleEvent("One", 102),
|
||||
newSimpleEvent("Two", 102)},
|
||||
expectEvent: true,
|
||||
expected: NewPayloadEvents(
|
||||
newSimpleEvent("One", 102),
|
||||
newSimpleEvent("Two", 102)),
|
||||
expectedCap: 5,
|
||||
},
|
||||
{
|
||||
name: "all events match, no namespace",
|
||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||
events: []Event{
|
||||
newSimpleEvent("Same", 103),
|
||||
newSimpleEvent("Same", 103)},
|
||||
expectEvent: true,
|
||||
expected: NewPayloadEvents(
|
||||
newSimpleEvent("Same", 103),
|
||||
newSimpleEvent("Same", 103)),
|
||||
expectedCap: 5,
|
||||
},
|
||||
{
|
||||
name: "all events match, no key",
|
||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||
events: []Event{
|
||||
newNSEvent("Something", "apps"),
|
||||
newNSEvent("Other", "apps")},
|
||||
expectEvent: true,
|
||||
expected: NewPayloadEvents(
|
||||
newNSEvent("Something", "apps"),
|
||||
newNSEvent("Other", "apps")),
|
||||
expectedCap: 5,
|
||||
},
|
||||
{
|
||||
name: "some evens match, no namespace",
|
||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||
events: []Event{
|
||||
newSimpleEvent("Same", 104),
|
||||
newSimpleEvent("Other", 104),
|
||||
newSimpleEvent("Same", 104)},
|
||||
expectEvent: true,
|
||||
expected: NewPayloadEvents(
|
||||
newSimpleEvent("Same", 104),
|
||||
newSimpleEvent("Same", 104)),
|
||||
expectedCap: 2,
|
||||
},
|
||||
{
|
||||
name: "some events match, no key",
|
||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||
events: []Event{
|
||||
newNSEvent("app1", "apps"),
|
||||
newNSEvent("db1", "dbs"),
|
||||
newNSEvent("app2", "apps")},
|
||||
expectEvent: true,
|
||||
expected: NewPayloadEvents(
|
||||
newNSEvent("app1", "apps"),
|
||||
newNSEvent("app2", "apps")),
|
||||
expectedCap: 2,
|
||||
},
|
||||
{
|
||||
name: "no events match key",
|
||||
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
|
||||
events: []Event{
|
||||
newSimpleEvent("Same", 0),
|
||||
newSimpleEvent("Same", 0)},
|
||||
},
|
||||
{
|
||||
name: "no events match namespace",
|
||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||
events: []Event{
|
||||
newNSEvent("app1", "group1"),
|
||||
newNSEvent("app2", "group2")},
|
||||
expectEvent: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fn(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newNSEvent(key, namespace string) Event {
|
||||
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
||||
}
|
||||
|
||||
type nsPayload struct {
|
||||
key string
|
||||
namespace string
|
||||
value string
|
||||
}
|
||||
|
||||
func (p nsPayload) FilterByKey(key, namespace string) bool {
|
||||
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
|
||||
}
|
||||
|
|
|
@ -101,8 +101,8 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) {
|
|||
if len(next.Events) == 0 {
|
||||
continue
|
||||
}
|
||||
event, ok := filterByKey(s.req, next.Events)
|
||||
if !ok {
|
||||
event := newEventFromBatch(s.req, next.Events)
|
||||
if !event.Payload.FilterByKey(s.req.Key, s.req.Namespace) {
|
||||
continue
|
||||
}
|
||||
return event, nil
|
||||
|
@ -128,22 +128,10 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event {
|
|||
return Event{
|
||||
Topic: req.Topic,
|
||||
Index: first.Index,
|
||||
Payload: PayloadEvents(events),
|
||||
Payload: NewPayloadEvents(events...),
|
||||
}
|
||||
}
|
||||
|
||||
func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
|
||||
event := newEventFromBatch(req, events)
|
||||
if req.Key == "" && req.Namespace == "" {
|
||||
return event, true
|
||||
}
|
||||
|
||||
fn := func(e Event) bool {
|
||||
return e.Payload.FilterByKey(req.Key, req.Namespace)
|
||||
}
|
||||
return event.Filter(fn)
|
||||
}
|
||||
|
||||
// Close the subscription. Subscribers will receive an error when they call Next,
|
||||
// and will need to perform a new Subscribe request.
|
||||
// It is safe to call from any goroutine.
|
||||
|
|
|
@ -138,147 +138,29 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
|
|||
b.Append([]Event{e})
|
||||
}
|
||||
|
||||
func newSimpleEvent(key string, index uint64) Event {
|
||||
return Event{Index: index, Payload: simplePayload{key: key}}
|
||||
}
|
||||
|
||||
func TestFilterByKey(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
req SubscribeRequest
|
||||
events []Event
|
||||
expectEvent bool
|
||||
expected Event
|
||||
expectedCap int
|
||||
}
|
||||
|
||||
fn := func(t *testing.T, tc testCase) {
|
||||
events := make(PayloadEvents, 0, 5)
|
||||
events = append(events, tc.events...)
|
||||
|
||||
actual, ok := filterByKey(tc.req, events)
|
||||
require.Equal(t, tc.expectEvent, ok)
|
||||
if !tc.expectEvent {
|
||||
return
|
||||
func TestNewEventsFromBatch(t *testing.T) {
|
||||
t.Run("single item", func(t *testing.T) {
|
||||
first := Event{
|
||||
Topic: testTopic,
|
||||
Index: 1234,
|
||||
Payload: simplePayload{key: "key"},
|
||||
}
|
||||
|
||||
require.Equal(t, tc.expected, actual)
|
||||
// test if there was a new array allocated or not
|
||||
require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents)))
|
||||
}
|
||||
|
||||
var testCases = []testCase{
|
||||
{
|
||||
name: "all events match, no key or namespace",
|
||||
req: SubscribeRequest{Topic: testTopic},
|
||||
events: []Event{
|
||||
newSimpleEvent("One", 102),
|
||||
newSimpleEvent("Two", 102)},
|
||||
expectEvent: true,
|
||||
expected: Event{
|
||||
Topic: testTopic,
|
||||
Index: 102,
|
||||
Payload: PayloadEvents{
|
||||
newSimpleEvent("One", 102),
|
||||
newSimpleEvent("Two", 102)}},
|
||||
expectedCap: 5,
|
||||
},
|
||||
{
|
||||
name: "all events match, no namespace",
|
||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||
events: []Event{
|
||||
newSimpleEvent("Same", 103),
|
||||
newSimpleEvent("Same", 103)},
|
||||
expectEvent: true,
|
||||
expected: Event{
|
||||
Topic: testTopic,
|
||||
Index: 103,
|
||||
Payload: PayloadEvents{
|
||||
newSimpleEvent("Same", 103),
|
||||
newSimpleEvent("Same", 103)}},
|
||||
expectedCap: 5,
|
||||
},
|
||||
{
|
||||
name: "all events match, no key",
|
||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||
events: []Event{
|
||||
newNSEvent("Something", "apps"),
|
||||
newNSEvent("Other", "apps")},
|
||||
expectEvent: true,
|
||||
expected: Event{
|
||||
Topic: testTopic,
|
||||
Index: 22,
|
||||
Payload: PayloadEvents{
|
||||
newNSEvent("Something", "apps"),
|
||||
newNSEvent("Other", "apps")}},
|
||||
expectedCap: 5,
|
||||
},
|
||||
{
|
||||
name: "some evens match, no namespace",
|
||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
||||
events: []Event{
|
||||
newSimpleEvent("Same", 104),
|
||||
newSimpleEvent("Other", 104),
|
||||
newSimpleEvent("Same", 104)},
|
||||
expectEvent: true,
|
||||
expected: Event{
|
||||
Topic: testTopic,
|
||||
Index: 104,
|
||||
Payload: PayloadEvents{
|
||||
newSimpleEvent("Same", 104),
|
||||
newSimpleEvent("Same", 104)}},
|
||||
expectedCap: 2,
|
||||
},
|
||||
{
|
||||
name: "some events match, no key",
|
||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||
events: []Event{
|
||||
newNSEvent("app1", "apps"),
|
||||
newNSEvent("db1", "dbs"),
|
||||
newNSEvent("app2", "apps")},
|
||||
expectEvent: true,
|
||||
expected: Event{
|
||||
Topic: testTopic,
|
||||
Index: 22,
|
||||
Payload: PayloadEvents{
|
||||
newNSEvent("app1", "apps"),
|
||||
newNSEvent("app2", "apps")}},
|
||||
expectedCap: 2,
|
||||
},
|
||||
{
|
||||
name: "no events match key",
|
||||
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
|
||||
events: []Event{
|
||||
newSimpleEvent("Same", 0),
|
||||
newSimpleEvent("Same", 0)},
|
||||
},
|
||||
{
|
||||
name: "no events match namespace",
|
||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
||||
events: []Event{
|
||||
newNSEvent("app1", "group1"),
|
||||
newNSEvent("app2", "group2")},
|
||||
expectEvent: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fn(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newNSEvent(key, namespace string) Event {
|
||||
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
||||
}
|
||||
|
||||
type nsPayload struct {
|
||||
key string
|
||||
namespace string
|
||||
value string
|
||||
}
|
||||
|
||||
func (p nsPayload) FilterByKey(key, namespace string) bool {
|
||||
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
|
||||
e := newEventFromBatch(SubscribeRequest{}, []Event{first})
|
||||
require.Equal(t, first, e)
|
||||
})
|
||||
t.Run("many items", func(t *testing.T) {
|
||||
events := []Event{
|
||||
newSimpleEvent("foo", 9999),
|
||||
newSimpleEvent("foo", 9999),
|
||||
newSimpleEvent("zee", 9999),
|
||||
}
|
||||
req := SubscribeRequest{Topic: testTopic}
|
||||
e := newEventFromBatch(req, events)
|
||||
expected := Event{
|
||||
Topic: testTopic,
|
||||
Index: 9999,
|
||||
Payload: NewPayloadEvents(events...),
|
||||
}
|
||||
require.Equal(t, expected, e)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue