114 lines
2.3 KiB
Go
114 lines
2.3 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEvent_Stream(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, s := makeClient(t, nil, nil)
|
|
defer s.Stop()
|
|
|
|
// register job to generate events
|
|
jobs := c.Jobs()
|
|
job := testJob()
|
|
resp2, _, err := jobs.Register(job, nil)
|
|
require.Nil(t, err)
|
|
require.NotNil(t, resp2)
|
|
|
|
// build event stream request
|
|
events := c.EventStream()
|
|
q := &QueryOptions{}
|
|
topics := map[Topic][]string{
|
|
"Eval": {"*"},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
streamCh, err := events.Stream(ctx, topics, 0, q)
|
|
require.NoError(t, err)
|
|
|
|
select {
|
|
case event := <-streamCh:
|
|
if event.Err != nil {
|
|
require.Fail(t, err.Error())
|
|
}
|
|
require.Equal(t, len(event.Events), 1)
|
|
require.Equal(t, "Eval", string(event.Events[0].Topic))
|
|
case <-time.After(5 * time.Second):
|
|
require.Fail(t, "failed waiting for event stream event")
|
|
}
|
|
}
|
|
|
|
func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, s := makeClient(t, nil, nil)
|
|
defer s.Stop()
|
|
|
|
// register job to generate events
|
|
jobs := c.Jobs()
|
|
job := testJob()
|
|
resp2, _, err := jobs.Register(job, nil)
|
|
require.Nil(t, err)
|
|
require.NotNil(t, resp2)
|
|
|
|
// build event stream request
|
|
events := c.EventStream()
|
|
q := &QueryOptions{}
|
|
topics := map[Topic][]string{
|
|
"Eval": {"::*"},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
_, err = events.Stream(ctx, topics, 0, q)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "400")
|
|
require.Contains(t, err.Error(), "Invalid key value pair")
|
|
}
|
|
|
|
func TestEvent_Stream_CloseCtx(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
c, s := makeClient(t, nil, nil)
|
|
defer s.Stop()
|
|
|
|
// register job to generate events
|
|
jobs := c.Jobs()
|
|
job := testJob()
|
|
resp2, _, err := jobs.Register(job, nil)
|
|
require.Nil(t, err)
|
|
require.NotNil(t, resp2)
|
|
|
|
// build event stream request
|
|
events := c.EventStream()
|
|
q := &QueryOptions{}
|
|
topics := map[Topic][]string{
|
|
"Eval": {"*"},
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
streamCh, err := events.Stream(ctx, topics, 0, q)
|
|
require.NoError(t, err)
|
|
|
|
// cancel the request
|
|
cancel()
|
|
|
|
select {
|
|
case event, ok := <-streamCh:
|
|
require.False(t, ok)
|
|
require.Nil(t, event)
|
|
case <-time.After(5 * time.Second):
|
|
require.Fail(t, "failed waiting for event stream event")
|
|
}
|
|
}
|