open-consul/agent/consul/stream/event_buffer_test.go
Daniel Nephin ef0999547a testing: skip slow tests with -short
Add a skip condition to all tests slower than 100ms.

This change was made using `gotestsum tool slowest` with data from the
last 3 CI runs of master.
See https://github.com/gotestyourself/gotestsum#finding-and-skipping-slow-tests

With this change:

```
$ time go test -count=1 -short ./agent
ok      github.com/hashicorp/consul/agent       0.743s

real    0m4.791s

$ time go test -count=1 -short ./agent/consul
ok      github.com/hashicorp/consul/agent/consul        4.229s

real    0m8.769s
```
2020-12-07 13:42:55 -05:00

94 lines
2.4 KiB
Go

package stream
import (
"context"
fmt "fmt"
"math/rand"
"testing"
time "time"
"github.com/stretchr/testify/assert"
)
// A property-based test to ensure that under heavy concurrent use trivial
// correctness properties are not violated (and that -race doesn't complain).
func TestEventBufferFuzz(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
if testing.Short() {
t.Skip("too slow for short run")
}
nReaders := 1000
nMessages := 1000
b := newEventBuffer()
// Start a write goroutine that will publish 10000 messages with sequential
// indexes and some jitter in timing (to allow clients to "catch up" and block
// waiting for updates).
go func() {
seed := time.Now().UnixNano()
t.Logf("Using seed %d", seed)
// z is a Zipfian distribution that gives us a number of milliseconds to
// sleep which are mostly low - near zero but occasionally spike up to near
// 100.
z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50)
for i := 0; i < nMessages; i++ {
// Event content is arbitrary and not valid for our use of buffers in
// streaming - here we only care about the semantics of the buffer.
e := Event{
Index: uint64(i), // Indexes should be contiguous
Topic: testTopic,
}
b.Append([]Event{e})
// Sleep sometimes for a while to let some subscribers catch up
wait := time.Duration(z.Uint64()) * time.Millisecond
time.Sleep(wait)
}
}()
// Run n subscribers following and verifying
errCh := make(chan error, nReaders)
// Load head here so all subscribers start from the same point or they might
// not run until several appends have already happened.
head := b.Head()
for i := 0; i < nReaders; i++ {
go func(i int) {
expect := uint64(0)
item := head
var err error
for {
item, err = item.Next(context.Background(), nil)
if err != nil {
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
expect, err)
return
}
if item.Events[0].Index != expect {
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
expect, item.Events[0].Index)
return
}
expect++
if expect == uint64(nMessages) {
// Succeeded
errCh <- nil
return
}
}
}(i)
}
// Wait for all readers to finish one way or other
for i := 0; i < nReaders; i++ {
err := <-errCh
assert.NoError(t, err)
}
}