2020-06-02 22:37:10 +00:00
|
|
|
package stream
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"testing"
|
|
|
|
time "time"
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
)
|
|
|
|
|
2020-07-08 04:31:22 +00:00
|
|
|
func noopUnSub() {}
|
|
|
|
|
2020-06-02 22:37:10 +00:00
|
|
|
func TestSubscription(t *testing.T) {
|
2020-07-06 21:29:45 +00:00
|
|
|
eb := newEventBuffer()
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
index := uint64(100)
|
|
|
|
|
|
|
|
startHead := eb.Head()
|
|
|
|
|
|
|
|
// Start with an event in the buffer
|
2020-06-05 23:36:31 +00:00
|
|
|
publishTestEvent(index, eb, "test")
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-10-01 17:51:55 +00:00
|
|
|
req := SubscribeRequest{
|
2020-07-07 00:04:24 +00:00
|
|
|
Topic: testTopic,
|
2020-06-02 22:37:10 +00:00
|
|
|
Key: "test",
|
|
|
|
}
|
2020-07-08 04:31:22 +00:00
|
|
|
sub := newSubscription(req, startHead, noopUnSub)
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
// First call to sub.Next should return our published event immediately
|
|
|
|
start := time.Now()
|
2020-07-08 04:31:22 +00:00
|
|
|
got, err := sub.Next(ctx)
|
2020-06-02 22:37:10 +00:00
|
|
|
elapsed := time.Since(start)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.True(t, elapsed < 200*time.Millisecond,
|
|
|
|
"Event should have been delivered immediately, took %s", elapsed)
|
2020-10-05 16:38:38 +00:00
|
|
|
require.Equal(t, index, got.Index)
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
// Schedule an event publish in a while
|
|
|
|
index++
|
|
|
|
start = time.Now()
|
|
|
|
time.AfterFunc(200*time.Millisecond, func() {
|
2020-06-05 23:36:31 +00:00
|
|
|
publishTestEvent(index, eb, "test")
|
2020-06-02 22:37:10 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
// Next call should block until event is delivered
|
2020-07-08 04:31:22 +00:00
|
|
|
got, err = sub.Next(ctx)
|
2020-06-02 22:37:10 +00:00
|
|
|
elapsed = time.Since(start)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.True(t, elapsed > 200*time.Millisecond,
|
|
|
|
"Event should have been delivered after blocking 200ms, took %s", elapsed)
|
|
|
|
require.True(t, elapsed < 2*time.Second,
|
|
|
|
"Event should have been delivered after short time, took %s", elapsed)
|
2020-10-05 16:38:38 +00:00
|
|
|
require.Equal(t, index, got.Index)
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
// Event with wrong key should not be delivered. Deliver a good message right
|
|
|
|
// so we don't have to block test thread forever or cancel func yet.
|
|
|
|
index++
|
2020-06-05 23:36:31 +00:00
|
|
|
publishTestEvent(index, eb, "nope")
|
2020-06-02 22:37:10 +00:00
|
|
|
index++
|
2020-06-05 23:36:31 +00:00
|
|
|
publishTestEvent(index, eb, "test")
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
start = time.Now()
|
2020-07-08 04:31:22 +00:00
|
|
|
got, err = sub.Next(ctx)
|
2020-06-02 22:37:10 +00:00
|
|
|
elapsed = time.Since(start)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.True(t, elapsed < 200*time.Millisecond,
|
|
|
|
"Event should have been delivered immediately, took %s", elapsed)
|
2020-10-05 16:38:38 +00:00
|
|
|
require.Equal(t, index, got.Index)
|
2020-10-27 18:40:06 +00:00
|
|
|
require.Equal(t, "test", got.Payload.(simplePayload).key)
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
// Cancelling the subscription context should unblock Next
|
|
|
|
start = time.Now()
|
|
|
|
time.AfterFunc(200*time.Millisecond, func() {
|
|
|
|
cancel()
|
|
|
|
})
|
|
|
|
|
2020-07-08 04:31:22 +00:00
|
|
|
_, err = sub.Next(ctx)
|
2020-06-02 22:37:10 +00:00
|
|
|
elapsed = time.Since(start)
|
|
|
|
require.Error(t, err)
|
|
|
|
require.True(t, elapsed > 200*time.Millisecond,
|
|
|
|
"Event should have been delivered after blocking 200ms, took %s", elapsed)
|
|
|
|
require.True(t, elapsed < 2*time.Second,
|
|
|
|
"Event should have been delivered after short time, took %s", elapsed)
|
|
|
|
}
|
|
|
|
|
2020-06-19 20:34:50 +00:00
|
|
|
func TestSubscription_Close(t *testing.T) {
|
2020-07-06 21:29:45 +00:00
|
|
|
eb := newEventBuffer()
|
2020-06-02 22:37:10 +00:00
|
|
|
index := uint64(100)
|
|
|
|
startHead := eb.Head()
|
|
|
|
|
|
|
|
// Start with an event in the buffer
|
2020-06-05 23:36:31 +00:00
|
|
|
publishTestEvent(index, eb, "test")
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-10-01 17:51:55 +00:00
|
|
|
req := SubscribeRequest{
|
2020-07-07 00:04:24 +00:00
|
|
|
Topic: testTopic,
|
2020-06-02 22:37:10 +00:00
|
|
|
Key: "test",
|
|
|
|
}
|
2020-07-08 04:31:22 +00:00
|
|
|
sub := newSubscription(req, startHead, noopUnSub)
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
// First call to sub.Next should return our published event immediately
|
|
|
|
start := time.Now()
|
2020-07-08 04:31:22 +00:00
|
|
|
got, err := sub.Next(ctx)
|
2020-06-02 22:37:10 +00:00
|
|
|
elapsed := time.Since(start)
|
|
|
|
require.NoError(t, err)
|
|
|
|
require.True(t, elapsed < 200*time.Millisecond,
|
|
|
|
"Event should have been delivered immediately, took %s", elapsed)
|
2020-10-05 16:38:38 +00:00
|
|
|
require.Equal(t, index, got.Index)
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2020-06-19 20:34:50 +00:00
|
|
|
// Schedule a Close simulating the server deciding this subscroption
|
2020-06-02 22:37:10 +00:00
|
|
|
// needs to reset (e.g. on ACL perm change).
|
|
|
|
start = time.Now()
|
|
|
|
time.AfterFunc(200*time.Millisecond, func() {
|
2020-07-08 04:31:22 +00:00
|
|
|
sub.forceClose()
|
2020-06-02 22:37:10 +00:00
|
|
|
})
|
|
|
|
|
2020-07-08 04:31:22 +00:00
|
|
|
_, err = sub.Next(ctx)
|
2020-06-02 22:37:10 +00:00
|
|
|
elapsed = time.Since(start)
|
|
|
|
require.Error(t, err)
|
2020-10-15 22:06:04 +00:00
|
|
|
require.Equal(t, ErrSubForceClosed, err)
|
2020-06-02 22:37:10 +00:00
|
|
|
require.True(t, elapsed > 200*time.Millisecond,
|
|
|
|
"Reload should have happened after blocking 200ms, took %s", elapsed)
|
|
|
|
require.True(t, elapsed < 2*time.Second,
|
|
|
|
"Reload should have been delivered after short time, took %s", elapsed)
|
|
|
|
}
|
|
|
|
|
2020-07-06 21:29:45 +00:00
|
|
|
func publishTestEvent(index uint64, b *eventBuffer, key string) {
|
2020-06-05 23:36:31 +00:00
|
|
|
e := Event{
|
2020-10-27 18:40:06 +00:00
|
|
|
Index: index,
|
|
|
|
Topic: testTopic,
|
|
|
|
Payload: simplePayload{key: key},
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
2020-06-05 23:36:31 +00:00
|
|
|
b.Append([]Event{e})
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
2020-07-15 21:03:39 +00:00
|
|
|
|
2020-10-27 18:40:06 +00:00
|
|
|
func newSimpleEvent(key string, index uint64) Event {
|
|
|
|
return Event{Index: index, Payload: simplePayload{key: key}}
|
2020-07-15 21:03:39 +00:00
|
|
|
}
|
|
|
|
|
2020-10-28 22:22:30 +00:00
|
|
|
func TestFilterByKey(t *testing.T) {
|
|
|
|
type testCase struct {
|
|
|
|
name string
|
|
|
|
req SubscribeRequest
|
|
|
|
events []Event
|
|
|
|
expectEvent bool
|
|
|
|
expected Event
|
|
|
|
expectedCap int
|
|
|
|
}
|
2020-07-15 21:03:39 +00:00
|
|
|
|
2020-10-28 22:22:30 +00:00
|
|
|
fn := func(t *testing.T, tc testCase) {
|
|
|
|
events := make(PayloadEvents, 0, 5)
|
|
|
|
events = append(events, tc.events...)
|
2020-07-15 21:03:39 +00:00
|
|
|
|
2020-10-28 22:22:30 +00:00
|
|
|
actual, ok := filterByKey(tc.req, events)
|
|
|
|
require.Equal(t, tc.expectEvent, ok)
|
|
|
|
if !tc.expectEvent {
|
|
|
|
return
|
|
|
|
}
|
2020-07-15 21:03:39 +00:00
|
|
|
|
2020-10-28 22:22:30 +00:00
|
|
|
require.Equal(t, tc.expected, actual)
|
|
|
|
// test if there was a new array allocated or not
|
|
|
|
require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents)))
|
2020-10-05 16:38:38 +00:00
|
|
|
}
|
2020-07-15 21:03:39 +00:00
|
|
|
|
2020-10-28 22:22:30 +00:00
|
|
|
var testCases = []testCase{
|
|
|
|
{
|
|
|
|
name: "all events match, no key or namespace",
|
|
|
|
req: SubscribeRequest{Topic: testTopic},
|
|
|
|
events: []Event{
|
|
|
|
newSimpleEvent("One", 102),
|
|
|
|
newSimpleEvent("Two", 102)},
|
|
|
|
expectEvent: true,
|
|
|
|
expected: Event{
|
|
|
|
Topic: testTopic,
|
|
|
|
Index: 102,
|
|
|
|
Payload: PayloadEvents{
|
|
|
|
newSimpleEvent("One", 102),
|
|
|
|
newSimpleEvent("Two", 102)}},
|
|
|
|
expectedCap: 5,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
name: "all events match, no namespace",
|
|
|
|
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
|
|
|
events: []Event{
|
|
|
|
newSimpleEvent("Same", 103),
|
|
|
|
newSimpleEvent("Same", 103)},
|
|
|
|
expectEvent: true,
|
|
|
|
expected: Event{
|
|
|
|
Topic: testTopic,
|
|
|
|
Index: 103,
|
|
|
|
Payload: PayloadEvents{
|
|
|
|
newSimpleEvent("Same", 103),
|
|
|
|
newSimpleEvent("Same", 103)}},
|
|
|
|
expectedCap: 5,
|
|
|
|
},
|
2020-10-28 22:35:27 +00:00
|
|
|
{
|
|
|
|
name: "all events match, no key",
|
|
|
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
|
|
events: []Event{
|
|
|
|
newNSEvent("Something", "apps"),
|
|
|
|
newNSEvent("Other", "apps")},
|
|
|
|
expectEvent: true,
|
|
|
|
expected: Event{
|
|
|
|
Topic: testTopic,
|
|
|
|
Index: 22,
|
|
|
|
Payload: PayloadEvents{
|
|
|
|
newNSEvent("Something", "apps"),
|
|
|
|
newNSEvent("Other", "apps")}},
|
|
|
|
expectedCap: 5,
|
|
|
|
},
|
2020-10-28 22:22:30 +00:00
|
|
|
{
|
|
|
|
name: "some evens match, no namespace",
|
|
|
|
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
|
|
|
events: []Event{
|
|
|
|
newSimpleEvent("Same", 104),
|
|
|
|
newSimpleEvent("Other", 104),
|
|
|
|
newSimpleEvent("Same", 104)},
|
|
|
|
expectEvent: true,
|
|
|
|
expected: Event{
|
|
|
|
Topic: testTopic,
|
|
|
|
Index: 104,
|
|
|
|
Payload: PayloadEvents{
|
|
|
|
newSimpleEvent("Same", 104),
|
|
|
|
newSimpleEvent("Same", 104)}},
|
|
|
|
expectedCap: 2,
|
|
|
|
},
|
2020-10-28 22:35:27 +00:00
|
|
|
{
|
|
|
|
name: "some events match, no key",
|
|
|
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
|
|
events: []Event{
|
|
|
|
newNSEvent("app1", "apps"),
|
|
|
|
newNSEvent("db1", "dbs"),
|
|
|
|
newNSEvent("app2", "apps")},
|
|
|
|
expectEvent: true,
|
|
|
|
expected: Event{
|
|
|
|
Topic: testTopic,
|
|
|
|
Index: 22,
|
|
|
|
Payload: PayloadEvents{
|
|
|
|
newNSEvent("app1", "apps"),
|
|
|
|
newNSEvent("app2", "apps")}},
|
|
|
|
expectedCap: 2,
|
|
|
|
},
|
2020-10-28 22:22:30 +00:00
|
|
|
{
|
|
|
|
name: "no events match key",
|
|
|
|
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
|
|
|
|
events: []Event{
|
|
|
|
newSimpleEvent("Same", 0),
|
|
|
|
newSimpleEvent("Same", 0)},
|
|
|
|
},
|
2020-10-28 22:35:27 +00:00
|
|
|
{
|
|
|
|
name: "no events match namespace",
|
|
|
|
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
|
|
events: []Event{
|
|
|
|
newNSEvent("app1", "group1"),
|
|
|
|
newNSEvent("app2", "group2")},
|
|
|
|
expectEvent: false,
|
|
|
|
},
|
2020-10-28 22:22:30 +00:00
|
|
|
}
|
2020-07-15 21:03:39 +00:00
|
|
|
|
2020-10-28 22:22:30 +00:00
|
|
|
for _, tc := range testCases {
|
|
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
|
|
fn(t, tc)
|
|
|
|
})
|
|
|
|
}
|
2020-07-15 21:03:39 +00:00
|
|
|
}
|
2020-10-28 22:35:27 +00:00
|
|
|
|
|
|
|
func newNSEvent(key, namespace string) Event {
|
|
|
|
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
|
|
|
}
|
|
|
|
|
|
|
|
type nsPayload struct {
|
|
|
|
key string
|
|
|
|
namespace string
|
|
|
|
value string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p nsPayload) FilterByKey(key, namespace string) bool {
|
|
|
|
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
|
|
|
|
}
|