open-nomad/command/agent/event_endpoint_test.go
Drew Bailey 4793bb4e01
Events/deployment events (#9004)
* Node Drain events and Node Events (#8980)

Deployment status updates

handle deployment status updates (paused, failed, resume)

deployment alloc health

generate events from apply plan result

txn err check, slim down deployment event

one ndjson line per index

* consolidate down to node event + type

* fix UpdateDeploymentAllocHealth test invocations

* fix test
2020-10-14 12:44:37 -04:00

149 lines
3.2 KiB
Go

package agent
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testEvent struct {
ID string
}
func TestEventStream(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
ctx, cancel := context.WithCancel(context.Background())
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream", nil)
require.Nil(t, err)
resp := httptest.NewRecorder()
respErrCh := make(chan error)
go func() {
_, err = s.Server.EventStream(resp, req)
respErrCh <- err
assert.NoError(t, err)
}()
pub, err := s.Agent.server.State().EventPublisher()
require.NoError(t, err)
pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}})
testutil.WaitForResult(func() (bool, error) {
got := resp.Body.String()
want := `{"ID":"123"}`
if strings.Contains(got, want) {
return true, nil
}
return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want)
}, func(err error) {
cancel()
require.Fail(t, err.Error())
})
// wait for response to close to prevent race between subscription
// shutdown and server shutdown returning subscription closed by server err
// resp.Close()
cancel()
select {
case err := <-respErrCh:
require.Nil(t, err)
case <-time.After(1 * time.Second):
require.Fail(t, "waiting for request cancellation")
}
})
}
func TestEventStream_QueryParse(t *testing.T) {
t.Parallel()
cases := []struct {
desc string
query string
want map[stream.Topic][]string
wantErr bool
}{
{
desc: "all topics and keys specified",
query: "?topic=*:*",
want: map[stream.Topic][]string{
"*": {"*"},
},
},
{
desc: "all topics and keys inferred",
query: "",
want: map[stream.Topic][]string{
"*": {"*"},
},
},
{
desc: "invalid key value formatting",
query: "?topic=NodeDrain:*:*",
wantErr: true,
},
{
desc: "invalid key value formatting no value",
query: "?topic=NodeDrain",
wantErr: true,
},
{
desc: "single topic and key",
query: "?topic=NodeDrain:*",
want: map[stream.Topic][]string{
"NodeDrain": {"*"},
},
},
{
desc: "single topic multiple keys",
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": {
"*",
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
},
},
{
desc: "multiple topics",
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": {
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
"NodeRegister": {
"*",
},
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
raw := fmt.Sprintf("http://localhost:80/v1/events%s", tc.query)
req, err := url.Parse(raw)
require.NoError(t, err)
got, err := parseEventTopics(req.Query())
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}