c463479848
properly wire up durable event count move newline responsibility moves newline creation from NDJson to the http handler, json stream only encodes and sends now ignore snapshot restore if broker is disabled enable dev mode to access event steam without acl use mapping instead of switch use pointers for config sizes, remove unused ttl, simplify closed conn logic
246 lines
6 KiB
Go
246 lines
6 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEventBufferFuzz(t *testing.T) {
|
|
nReaders := 1000
|
|
nMessages := 1000
|
|
|
|
b := newEventBuffer(1000, nil)
|
|
|
|
// Start a write goroutine that will publish 10000 messages with sequential
|
|
// indexes and some jitter in timing (to allow clients to "catch up" and block
|
|
// waiting for updates).
|
|
go func() {
|
|
seed := time.Now().UnixNano()
|
|
t.Logf("Using seed %d", seed)
|
|
// z is a Zipfian distribution that gives us a number of milliseconds to
|
|
// sleep which are mostly low - near zero but occasionally spike up to near
|
|
// 100.
|
|
z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50)
|
|
|
|
for i := 0; i < nMessages; i++ {
|
|
// Event content is arbitrary and not valid for our use of buffers in
|
|
// streaming - here we only care about the semantics of the buffer.
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
// Sleep sometimes for a while to let some subscribers catch up
|
|
wait := time.Duration(z.Uint64()) * time.Millisecond
|
|
time.Sleep(wait)
|
|
}
|
|
}()
|
|
|
|
// Run n subscribers following and verifying
|
|
errCh := make(chan error, nReaders)
|
|
|
|
// Load head here so all subscribers start from the same point or they might
|
|
// not run until several appends have already happened.
|
|
head := b.Head()
|
|
|
|
for i := 0; i < nReaders; i++ {
|
|
go func(i int) {
|
|
expect := uint64(0)
|
|
item := head
|
|
var err error
|
|
for {
|
|
item, err = item.Next(context.Background(), nil)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
|
|
expect, err)
|
|
return
|
|
}
|
|
if item.Events.Events[0].Index != expect {
|
|
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
|
|
expect, item.Events.Events[0].Index)
|
|
return
|
|
}
|
|
expect++
|
|
if expect == uint64(nMessages) {
|
|
// Succeeded
|
|
errCh <- nil
|
|
return
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Wait for all readers to finish one way or other
|
|
for i := 0; i < nReaders; i++ {
|
|
err := <-errCh
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func TestEventBuffer_Slow_Reader(t *testing.T) {
|
|
b := newEventBuffer(10, nil)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
head := b.Head()
|
|
|
|
for i := 10; i < 15; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
// Ensure the slow reader errors to handle dropped events and
|
|
// fetch latest head
|
|
ev, err := head.Next(context.Background(), nil)
|
|
require.Error(t, err)
|
|
require.Nil(t, ev)
|
|
|
|
newHead := b.Head()
|
|
require.Equal(t, 5, int(newHead.Events.Index))
|
|
}
|
|
|
|
func TestEventBuffer_Size(t *testing.T) {
|
|
b := newEventBuffer(100, nil)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, b.Len())
|
|
}
|
|
|
|
// TestEventBuffer_Emptying_Buffer tests the behavior when all items
|
|
// are removed, the event buffer should advance its head down to the last message
|
|
// and insert a placeholder sentinel value.
|
|
func TestEventBuffer_Emptying_Buffer(t *testing.T) {
|
|
b := newEventBuffer(10, nil)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, int(b.Len()))
|
|
|
|
// empty the buffer, which will bring the event buffer down
|
|
// to a single sentinel value
|
|
for i := 0; i < 11; i++ {
|
|
b.advanceHead()
|
|
}
|
|
|
|
// head and tail are now a sentinel value
|
|
head := b.Head()
|
|
tail := b.Tail()
|
|
require.Equal(t, 0, int(head.Events.Index))
|
|
require.Equal(t, 0, b.Len())
|
|
require.Equal(t, head, tail)
|
|
|
|
e := structs.Event{
|
|
Index: uint64(100),
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(100), Events: []structs.Event{e}})
|
|
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
|
|
defer cancel()
|
|
|
|
next, err := head.Next(ctx, make(chan struct{}))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, next)
|
|
require.Equal(t, uint64(100), next.Events.Index)
|
|
|
|
}
|
|
|
|
func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) {
|
|
cases := []struct {
|
|
desc string
|
|
req uint64
|
|
expected uint64
|
|
offset int
|
|
}{
|
|
{
|
|
desc: "requested index less than head receives head",
|
|
req: 10,
|
|
expected: 11,
|
|
offset: 1,
|
|
},
|
|
{
|
|
desc: "requested exact match head",
|
|
req: 11,
|
|
expected: 11,
|
|
offset: 0,
|
|
},
|
|
{
|
|
desc: "requested exact match",
|
|
req: 42,
|
|
expected: 42,
|
|
offset: 0,
|
|
},
|
|
{
|
|
desc: "requested index greater than tail receives tail",
|
|
req: 500,
|
|
expected: 100,
|
|
offset: 400,
|
|
},
|
|
}
|
|
|
|
// buffer starts at index 11 goes to 100
|
|
b := newEventBuffer(100, nil)
|
|
|
|
for i := 11; i <= 100; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
got, offset := b.StartAtClosest(tc.req)
|
|
require.Equal(t, int(tc.expected), int(got.Events.Index))
|
|
require.Equal(t, tc.offset, offset)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEventBuffer_OnEvict(t *testing.T) {
|
|
called := make(chan struct{})
|
|
testOnEvict := func(events *structs.Events) {
|
|
close(called)
|
|
}
|
|
b := newEventBuffer(2, testOnEvict)
|
|
|
|
// start at 1 since new event buffer is built with a starting sentinel value
|
|
for i := 1; i < 4; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
select {
|
|
case <-called:
|
|
// testOnEvict called
|
|
case <-time.After(100 * time.Millisecond):
|
|
require.Fail(t, "expected testOnEvict to be called")
|
|
}
|
|
}
|