a0b7f05a7b
* Remove Managed Sinks from Nomad Managed Sinks were a beta feature in Nomad 1.0-beta2. During the beta period it was determined that this was not a scalable approach to support community and third party sinks. * update comment * changelog
205 lines
4.8 KiB
Go
205 lines
4.8 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type testEvent struct {
|
|
ID string
|
|
}
|
|
|
|
func TestEventStream(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
httpTest(t, nil, func(s *TestAgent) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream", nil)
|
|
require.Nil(t, err)
|
|
resp := httptest.NewRecorder()
|
|
|
|
respErrCh := make(chan error)
|
|
go func() {
|
|
_, err = s.Server.EventStream(resp, req)
|
|
respErrCh <- err
|
|
assert.NoError(t, err)
|
|
}()
|
|
|
|
pub, err := s.Agent.server.State().EventBroker()
|
|
require.NoError(t, err)
|
|
pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}})
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
got := resp.Body.String()
|
|
want := `{"ID":"123"}`
|
|
if strings.Contains(got, want) {
|
|
return true, nil
|
|
}
|
|
|
|
return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want)
|
|
}, func(err error) {
|
|
cancel()
|
|
require.Fail(t, err.Error())
|
|
})
|
|
|
|
// wait for response to close to prevent race between subscription
|
|
// shutdown and server shutdown returning subscription closed by server err
|
|
cancel()
|
|
select {
|
|
case err := <-respErrCh:
|
|
require.Nil(t, err)
|
|
case <-time.After(1 * time.Second):
|
|
require.Fail(t, "waiting for request cancellation")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEventStream_NamespaceQuery(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
httpTest(t, nil, func(s *TestAgent) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream?namespace=foo", nil)
|
|
require.Nil(t, err)
|
|
resp := httptest.NewRecorder()
|
|
|
|
respErrCh := make(chan error)
|
|
go func() {
|
|
_, err = s.Server.EventStream(resp, req)
|
|
respErrCh <- err
|
|
assert.NoError(t, err)
|
|
}()
|
|
|
|
pub, err := s.Agent.server.State().EventBroker()
|
|
require.NoError(t, err)
|
|
|
|
badID := uuid.Generate()
|
|
pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Namespace: "bar", Payload: testEvent{ID: badID}}}})
|
|
pub.Publish(&structs.Events{Index: 101, Events: []structs.Event{{Namespace: "foo", Payload: testEvent{ID: "456"}}}})
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
got := resp.Body.String()
|
|
want := `"Namespace":"foo"`
|
|
if strings.Contains(got, badID) {
|
|
return false, fmt.Errorf("expected non matching namespace to be filtered, got:%v", got)
|
|
}
|
|
if strings.Contains(got, want) {
|
|
return true, nil
|
|
}
|
|
|
|
return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want)
|
|
}, func(err error) {
|
|
require.Fail(t, err.Error())
|
|
})
|
|
|
|
// wait for response to close to prevent race between subscription
|
|
// shutdown and server shutdown returning subscription closed by server err
|
|
cancel()
|
|
select {
|
|
case err := <-respErrCh:
|
|
require.Nil(t, err)
|
|
case <-time.After(1 * time.Second):
|
|
require.Fail(t, "waiting for request cancellation")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEventStream_QueryParse(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
cases := []struct {
|
|
desc string
|
|
query string
|
|
want map[structs.Topic][]string
|
|
wantErr bool
|
|
}{
|
|
{
|
|
desc: "all topics and keys specified",
|
|
query: "?topic=*:*",
|
|
want: map[structs.Topic][]string{
|
|
"*": {"*"},
|
|
},
|
|
},
|
|
{
|
|
desc: "all topics and keys inferred",
|
|
query: "",
|
|
want: map[structs.Topic][]string{
|
|
"*": {"*"},
|
|
},
|
|
},
|
|
{
|
|
desc: "invalid key value formatting",
|
|
query: "?topic=NodeDrain:*:*",
|
|
wantErr: true,
|
|
},
|
|
{
|
|
desc: "Infer wildcard if absent",
|
|
query: "?topic=NodeDrain",
|
|
wantErr: false,
|
|
want: map[structs.Topic][]string{
|
|
"NodeDrain": {"*"},
|
|
},
|
|
},
|
|
{
|
|
desc: "single topic and key",
|
|
query: "?topic=NodeDrain:*",
|
|
want: map[structs.Topic][]string{
|
|
"NodeDrain": {"*"},
|
|
},
|
|
},
|
|
{
|
|
desc: "single topic multiple keys",
|
|
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
|
|
want: map[structs.Topic][]string{
|
|
"NodeDrain": {
|
|
"*",
|
|
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "multiple topics",
|
|
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
|
|
want: map[structs.Topic][]string{
|
|
"NodeDrain": {
|
|
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
|
|
},
|
|
"NodeRegister": {
|
|
"*",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
raw := fmt.Sprintf("http://localhost:80/v1/events%s", tc.query)
|
|
req, err := url.Parse(raw)
|
|
require.NoError(t, err)
|
|
|
|
got, err := parseEventTopics(req.Query())
|
|
if tc.wantErr {
|
|
require.Error(t, err)
|
|
return
|
|
}
|
|
require.NoError(t, err)
|
|
require.Equal(t, tc.want, got)
|
|
})
|
|
}
|
|
}
|