a2f8605c66
Move the subscription context to Next. context.Context should generally never be stored in a struct because it makes that struct only valid while the context is valid. This is rarely obvious from the caller. Adds a forceClosed channel in place of the old context, and uses the new context as a way for the caller to stop the Subscription blocking. Remove some recursion out of bufferImte.Next. The caller is already looping so we can continue in that loop instead of recursing. This ensures currentItem is updated immediately (which probably does not matter in practice), and also removes the chance that we overflow the stack. NextNoBlock and FollowAfter do not need to handle bufferItem.Err, the caller already handles it. Moves filter to a method to simplify Next, and more explicitly separate filtering from looping. Also improve some godoc Only unwrap itemBuffer.Err when necessary
90 lines
2.3 KiB
Go
90 lines
2.3 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
fmt "fmt"
|
|
"math/rand"
|
|
"testing"
|
|
time "time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
// A property-based test to ensure that under heavy concurrent use trivial
|
|
// correctness properties are not violated (and that -race doesn't complain).
|
|
func TestEventBufferFuzz(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for short run")
|
|
}
|
|
|
|
nReaders := 1000
|
|
nMessages := 1000
|
|
|
|
b := newEventBuffer()
|
|
|
|
// Start a write goroutine that will publish 10000 messages with sequential
|
|
// indexes and some jitter in timing (to allow clients to "catch up" and block
|
|
// waiting for updates).
|
|
go func() {
|
|
seed := time.Now().UnixNano()
|
|
t.Logf("Using seed %d", seed)
|
|
// z is a Zipfian distribution that gives us a number of milliseconds to
|
|
// sleep which are mostly low - near zero but occasionally spike up to near
|
|
// 100.
|
|
z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50)
|
|
|
|
for i := 0; i < nMessages; i++ {
|
|
// Event content is arbitrary and not valid for our use of buffers in
|
|
// streaming - here we only care about the semantics of the buffer.
|
|
e := Event{
|
|
Index: uint64(i), // Indexes should be contiguous
|
|
Topic: testTopic,
|
|
}
|
|
b.Append([]Event{e})
|
|
// Sleep sometimes for a while to let some subscribers catch up
|
|
wait := time.Duration(z.Uint64()) * time.Millisecond
|
|
time.Sleep(wait)
|
|
}
|
|
}()
|
|
|
|
// Run n subscribers following and verifying
|
|
errCh := make(chan error, nReaders)
|
|
|
|
// Load head here so all subscribers start from the same point or they might
|
|
// not run until several appends have already happened.
|
|
head := b.Head()
|
|
|
|
for i := 0; i < nReaders; i++ {
|
|
go func(i int) {
|
|
expect := uint64(0)
|
|
item := head
|
|
var err error
|
|
for {
|
|
item, err = item.Next(context.Background(), nil)
|
|
if err != nil {
|
|
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
|
|
expect, err)
|
|
return
|
|
}
|
|
if item.Events[0].Index != expect {
|
|
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
|
|
expect, item.Events[0].Index)
|
|
return
|
|
}
|
|
expect++
|
|
if expect == uint64(nMessages) {
|
|
// Succeeded
|
|
errCh <- nil
|
|
return
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Wait for all readers to finish one way or other
|
|
for i := 0; i < nReaders; i++ {
|
|
err := <-errCh
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|