d62d8a8587
* Improve managed sink run loop and reloading resetCh no longer needed length of buffer equal to count of items, not count of events in each item update equality fn name, pr feedback clean up sink manager sink creation * update test to reflect changes * bad editor find and replace * pr feedback
237 lines
5.7 KiB
Go
237 lines
5.7 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEventBufferFuzz(t *testing.T) {
|
|
nReaders := 1000
|
|
nMessages := 1000
|
|
|
|
b := newEventBuffer(1000)
|
|
|
|
// Start a write goroutine that will publish 10000 messages with sequential
|
|
// indexes and some jitter in timing (to allow clients to "catch up" and block
|
|
// waiting for updates).
|
|
go func() {
|
|
seed := time.Now().UnixNano()
|
|
t.Logf("Using seed %d", seed)
|
|
// z is a Zipfian distribution that gives us a number of milliseconds to
|
|
// sleep which are mostly low - near zero but occasionally spike up to near
|
|
// 100.
|
|
z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50)
|
|
|
|
for i := 0; i < nMessages; i++ {
|
|
// Event content is arbitrary and not valid for our use of buffers in
|
|
// streaming - here we only care about the semantics of the buffer.
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
// Sleep sometimes for a while to let some subscribers catch up
|
|
wait := time.Duration(z.Uint64()) * time.Millisecond
|
|
time.Sleep(wait)
|
|
}
|
|
}()
|
|
|
|
// Run n subscribers following and verifying
|
|
errCh := make(chan error, nReaders)
|
|
|
|
// Load head here so all subscribers start from the same point or they might
|
|
// not run until several appends have already happened.
|
|
head := b.Head()
|
|
|
|
for i := 0; i < nReaders; i++ {
|
|
go func(i int) {
|
|
expect := uint64(0)
|
|
item := head
|
|
var err error
|
|
for {
|
|
item, err = item.Next(context.Background(), nil)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
|
|
expect, err)
|
|
return
|
|
}
|
|
if item.Events.Events[0].Index != expect {
|
|
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
|
|
expect, item.Events.Events[0].Index)
|
|
return
|
|
}
|
|
expect++
|
|
if expect == uint64(nMessages) {
|
|
// Succeeded
|
|
errCh <- nil
|
|
return
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Wait for all readers to finish one way or other
|
|
for i := 0; i < nReaders; i++ {
|
|
err := <-errCh
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func TestEventBuffer_Slow_Reader(t *testing.T) {
|
|
b := newEventBuffer(10)
|
|
|
|
for i := 1; i < 11; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, b.Len())
|
|
|
|
head := b.Head()
|
|
|
|
for i := 10; i < 15; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
// Ensure the slow reader errors to handle dropped events and
|
|
// fetch latest head
|
|
ev, err := head.Next(context.Background(), nil)
|
|
require.Error(t, err)
|
|
require.Nil(t, ev)
|
|
|
|
newHead := b.Head()
|
|
require.Equal(t, 5, int(newHead.Events.Index))
|
|
}
|
|
|
|
func TestEventBuffer_Size(t *testing.T) {
|
|
b := newEventBuffer(100)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, b.Len())
|
|
}
|
|
|
|
func TestEventBuffer_MaxSize(t *testing.T) {
|
|
b := newEventBuffer(10)
|
|
|
|
var events []structs.Event
|
|
for i := 0; i < 100; i++ {
|
|
events = append(events, structs.Event{})
|
|
}
|
|
|
|
b.Append(&structs.Events{Index: uint64(1), Events: events})
|
|
require.Equal(t, 1, b.Len())
|
|
}
|
|
|
|
// TestEventBuffer_Emptying_Buffer tests the behavior when all items
|
|
// are removed, the event buffer should advance its head down to the last message
|
|
// and insert a placeholder sentinel value.
|
|
func TestEventBuffer_Emptying_Buffer(t *testing.T) {
|
|
b := newEventBuffer(10)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, int(b.Len()))
|
|
|
|
// empty the buffer, which will bring the event buffer down
|
|
// to a single sentinel value
|
|
for i := 0; i < 16; i++ {
|
|
b.advanceHead()
|
|
}
|
|
|
|
// head and tail are now a sentinel value
|
|
head := b.Head()
|
|
tail := b.Tail()
|
|
require.Equal(t, 0, int(head.Events.Index))
|
|
require.Equal(t, 0, b.Len())
|
|
require.Equal(t, head, tail)
|
|
|
|
e := structs.Event{
|
|
Index: uint64(100),
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(100), Events: []structs.Event{e}})
|
|
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
|
|
defer cancel()
|
|
|
|
next, err := head.Next(ctx, make(chan struct{}))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, next)
|
|
require.Equal(t, uint64(100), next.Events.Index)
|
|
|
|
}
|
|
|
|
func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) {
|
|
cases := []struct {
|
|
desc string
|
|
req uint64
|
|
expected uint64
|
|
offset int
|
|
}{
|
|
{
|
|
desc: "requested index less than head receives head",
|
|
req: 10,
|
|
expected: 11,
|
|
offset: 1,
|
|
},
|
|
{
|
|
desc: "requested exact match head",
|
|
req: 11,
|
|
expected: 11,
|
|
offset: 0,
|
|
},
|
|
{
|
|
desc: "requested exact match",
|
|
req: 42,
|
|
expected: 42,
|
|
offset: 0,
|
|
},
|
|
{
|
|
desc: "requested index greater than tail receives tail",
|
|
req: 500,
|
|
expected: 100,
|
|
offset: 400,
|
|
},
|
|
}
|
|
|
|
// buffer starts at index 11 goes to 100
|
|
b := newEventBuffer(100)
|
|
|
|
for i := 11; i <= 100; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
got, offset := b.StartAtClosest(tc.req)
|
|
require.Equal(t, int(tc.expected), int(got.Events.Index))
|
|
require.Equal(t, tc.offset, offset)
|
|
})
|
|
}
|
|
}
|