c463479848
properly wire up durable event count move newline responsibility moves newline creation from NDJson to the http handler, json stream only encodes and sends now ignore snapshot restore if broker is disabled enable dev mode to access event steam without acl use mapping instead of switch use pointers for config sizes, remove unused ttl, simplify closed conn logic
114 lines
2.3 KiB
Go
114 lines
2.3 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEvent_Stream(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, s := makeClient(t, nil, nil)
|
|
defer s.Stop()
|
|
|
|
// register job to generate events
|
|
jobs := c.Jobs()
|
|
job := testJob()
|
|
resp2, _, err := jobs.Register(job, nil)
|
|
require.Nil(t, err)
|
|
require.NotNil(t, resp2)
|
|
|
|
// build event stream request
|
|
events := c.EventStream()
|
|
q := &QueryOptions{}
|
|
topics := map[Topic][]string{
|
|
"Eval": {"*"},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
streamCh, err := events.Stream(ctx, topics, 0, q)
|
|
require.NoError(t, err)
|
|
|
|
select {
|
|
case event := <-streamCh:
|
|
if event.Err != nil {
|
|
require.Fail(t, err.Error())
|
|
}
|
|
require.Equal(t, len(event.Events), 1)
|
|
require.Equal(t, "Eval", string(event.Events[0].Topic))
|
|
case <-time.After(5 * time.Second):
|
|
require.Fail(t, "failed waiting for event stream event")
|
|
}
|
|
}
|
|
|
|
func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, s := makeClient(t, nil, nil)
|
|
defer s.Stop()
|
|
|
|
// register job to generate events
|
|
jobs := c.Jobs()
|
|
job := testJob()
|
|
resp2, _, err := jobs.Register(job, nil)
|
|
require.Nil(t, err)
|
|
require.NotNil(t, resp2)
|
|
|
|
// build event stream request
|
|
events := c.EventStream()
|
|
q := &QueryOptions{}
|
|
topics := map[Topic][]string{
|
|
"Eval": {"::*"},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
_, err = events.Stream(ctx, topics, 0, q)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "400")
|
|
require.Contains(t, err.Error(), "Invalid key value pair")
|
|
}
|
|
|
|
func TestEvent_Stream_CloseCtx(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, s := makeClient(t, nil, nil)
|
|
defer s.Stop()
|
|
|
|
// register job to generate events
|
|
jobs := c.Jobs()
|
|
job := testJob()
|
|
resp2, _, err := jobs.Register(job, nil)
|
|
require.Nil(t, err)
|
|
require.NotNil(t, resp2)
|
|
|
|
// build event stream request
|
|
events := c.EventStream()
|
|
q := &QueryOptions{}
|
|
topics := map[Topic][]string{
|
|
"Eval": {"*"},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
streamCh, err := events.Stream(ctx, topics, 0, q)
|
|
require.NoError(t, err)
|
|
|
|
// cancel the request
|
|
cancel()
|
|
|
|
select {
|
|
case event, ok := <-streamCh:
|
|
require.False(t, ok)
|
|
require.Nil(t, event)
|
|
case <-time.After(5 * time.Second):
|
|
require.Fail(t, "failed waiting for event stream event")
|
|
}
|
|
}
|