237 lines
5.7 KiB
Go
237 lines
5.7 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEventBufferFuzz(t *testing.T) {
|
|
nReaders := 1000
|
|
nMessages := 1000
|
|
|
|
b := newEventBuffer(1000)
|
|
|
|
// Start a write goroutine that will publish 10000 messages with sequential
|
|
// indexes and some jitter in timing (to allow clients to "catch up" and block
|
|
// waiting for updates).
|
|
go func() {
|
|
seed := time.Now().UnixNano()
|
|
t.Logf("Using seed %d", seed)
|
|
// z is a Zipfian distribution that gives us a number of milliseconds to
|
|
// sleep which are mostly low - near zero but occasionally spike up to near
|
|
// 100.
|
|
z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50)
|
|
|
|
for i := 0; i < nMessages; i++ {
|
|
// Event content is arbitrary and not valid for our use of buffers in
|
|
// streaming - here we only care about the semantics of the buffer.
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
// Sleep sometimes for a while to let some subscribers catch up
|
|
wait := time.Duration(z.Uint64()) * time.Millisecond
|
|
time.Sleep(wait)
|
|
}
|
|
}()
|
|
|
|
// Run n subscribers following and verifying
|
|
errCh := make(chan error, nReaders)
|
|
|
|
// Load head here so all subscribers start from the same point or they might
|
|
// not run until several appends have already happened.
|
|
head := b.Head()
|
|
|
|
for i := 0; i < nReaders; i++ {
|
|
go func(i int) {
|
|
expect := uint64(0)
|
|
item := head
|
|
var err error
|
|
for {
|
|
item, err = item.Next(context.Background(), nil)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
|
|
expect, err)
|
|
return
|
|
}
|
|
if item.Events.Events[0].Index != expect {
|
|
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
|
|
expect, item.Events.Events[0].Index)
|
|
return
|
|
}
|
|
expect++
|
|
if expect == uint64(nMessages) {
|
|
// Succeeded
|
|
errCh <- nil
|
|
return
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Wait for all readers to finish one way or other
|
|
for i := 0; i < nReaders; i++ {
|
|
err := <-errCh
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func TestEventBuffer_Slow_Reader(t *testing.T) {
|
|
b := newEventBuffer(10)
|
|
|
|
for i := 1; i < 11; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, b.Len())
|
|
|
|
head := b.Head()
|
|
|
|
for i := 10; i < 15; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
// Ensure the slow reader errors to handle dropped events and
|
|
// fetch latest head
|
|
ev, err := head.Next(context.Background(), nil)
|
|
require.Error(t, err)
|
|
require.Nil(t, ev)
|
|
|
|
newHead := b.Head()
|
|
require.Equal(t, 5, int(newHead.Events.Index))
|
|
}
|
|
|
|
func TestEventBuffer_Size(t *testing.T) {
|
|
b := newEventBuffer(100)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, b.Len())
|
|
}
|
|
|
|
func TestEventBuffer_MaxSize(t *testing.T) {
|
|
b := newEventBuffer(10)
|
|
|
|
var events []structs.Event
|
|
for i := 0; i < 100; i++ {
|
|
events = append(events, structs.Event{})
|
|
}
|
|
|
|
b.Append(&structs.Events{Index: uint64(1), Events: events})
|
|
require.Equal(t, 1, b.Len())
|
|
}
|
|
|
|
// TestEventBuffer_Emptying_Buffer tests the behavior when all items
|
|
// are removed, the event buffer should advance its head down to the last message
|
|
// and insert a placeholder sentinel value.
|
|
func TestEventBuffer_Emptying_Buffer(t *testing.T) {
|
|
b := newEventBuffer(10)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
require.Equal(t, 10, int(b.Len()))
|
|
|
|
// empty the buffer, which will bring the event buffer down
|
|
// to a single sentinel value
|
|
for i := 0; i < 16; i++ {
|
|
b.advanceHead()
|
|
}
|
|
|
|
// head and tail are now a sentinel value
|
|
head := b.Head()
|
|
tail := b.Tail()
|
|
require.Equal(t, 0, int(head.Events.Index))
|
|
require.Equal(t, 0, b.Len())
|
|
require.Equal(t, head, tail)
|
|
|
|
e := structs.Event{
|
|
Index: uint64(100),
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(100), Events: []structs.Event{e}})
|
|
|
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
|
|
defer cancel()
|
|
|
|
next, err := head.Next(ctx, make(chan struct{}))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, next)
|
|
require.Equal(t, uint64(100), next.Events.Index)
|
|
|
|
}
|
|
|
|
func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) {
|
|
cases := []struct {
|
|
desc string
|
|
req uint64
|
|
expected uint64
|
|
offset int
|
|
}{
|
|
{
|
|
desc: "requested index less than head receives head",
|
|
req: 10,
|
|
expected: 11,
|
|
offset: 1,
|
|
},
|
|
{
|
|
desc: "requested exact match head",
|
|
req: 11,
|
|
expected: 11,
|
|
offset: 0,
|
|
},
|
|
{
|
|
desc: "requested exact match",
|
|
req: 42,
|
|
expected: 42,
|
|
offset: 0,
|
|
},
|
|
{
|
|
desc: "requested index greater than tail receives tail",
|
|
req: 500,
|
|
expected: 100,
|
|
offset: 400,
|
|
},
|
|
}
|
|
|
|
// buffer starts at index 11 goes to 100
|
|
b := newEventBuffer(100)
|
|
|
|
for i := 11; i <= 100; i++ {
|
|
e := structs.Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
}
|
|
b.Append(&structs.Events{Index: uint64(i), Events: []structs.Event{e}})
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
got, offset := b.StartAtClosest(tc.req)
|
|
require.Equal(t, int(tc.expected), int(got.Events.Index))
|
|
require.Equal(t, tc.offset, offset)
|
|
})
|
|
}
|
|
}
|