open-nomad/api/event_stream_test.go

345 lines
8.3 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package api
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/mitchellh/mapstructure"
"github.com/shoenig/test/must"
)
func TestTopic_String(t *testing.T) {
testutil.Parallel(t)
testCases := []struct {
inputTopic Topic
expectedOutput string
}{
{
inputTopic: TopicDeployment,
expectedOutput: "Deployment",
},
{
inputTopic: TopicEvaluation,
expectedOutput: "Evaluation",
},
{
inputTopic: TopicAllocation,
expectedOutput: "Allocation",
},
{
inputTopic: TopicJob,
expectedOutput: "Job",
},
{
inputTopic: TopicNode,
expectedOutput: "Node",
},
{
inputTopic: TopicNodePool,
expectedOutput: "NodePool",
},
{
inputTopic: TopicService,
expectedOutput: "Service",
},
{
inputTopic: TopicAll,
expectedOutput: "*",
},
}
for _, tc := range testCases {
t.Run(tc.expectedOutput, func(t *testing.T) {
actualOutput := tc.inputTopic.String()
must.Eq(t, tc.expectedOutput, actualOutput)
})
}
}
func TestEvent_Stream(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
must.NoError(t, err)
must.NotNil(t, resp2)
// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
TopicEvaluation: {"*"},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streamCh, err := events.Stream(ctx, topics, 0, q)
must.NoError(t, err)
select {
case event := <-streamCh:
if event.Err != nil {
must.Unreachable(t, must.Sprintf("unexpected %v", event.Err))
}
must.Len(t, 1, event.Events)
must.Eq(t, "Evaluation", string(event.Events[0].Topic))
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream event"))
}
}
func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
must.NoError(t, err)
must.NotNil(t, resp2)
// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
TopicEvaluation: {"::*"},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err = events.Stream(ctx, topics, 0, q)
must.ErrorContains(t, err, "Invalid key value pair")
}
func TestEvent_Stream_CloseCtx(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
must.NoError(t, err)
must.NotNil(t, resp2)
// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
TopicEvaluation: {"*"},
}
ctx, cancel := context.WithCancel(context.Background())
streamCh, err := events.Stream(ctx, topics, 0, q)
must.NoError(t, err)
// cancel the request
cancel()
select {
case event, ok := <-streamCh:
must.False(t, ok)
must.Nil(t, event)
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream event"))
}
}
func TestEventStream_PayloadValue(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
must.NoError(t, err)
must.NotNil(t, resp2)
// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
TopicNode: {"*"},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streamCh, err := events.Stream(ctx, topics, 0, q)
must.NoError(t, err)
select {
case event := <-streamCh:
if event.Err != nil {
must.NoError(t, err)
}
for _, e := range event.Events {
// verify that we get a node
n, err := e.Node()
must.NoError(t, err)
must.UUIDv4(t, n.ID)
// perform a raw decoding and look for:
// - "ID" to make sure that raw decoding is working correctly
// - "SecretID" to make sure it's not present
raw := make(map[string]map[string]interface{}, 0)
cfg := &mapstructure.DecoderConfig{
Result: &raw,
}
dec, err := mapstructure.NewDecoder(cfg)
must.NoError(t, err)
must.NoError(t, dec.Decode(e.Payload))
must.MapContainsKeys(t, raw, []string{"Node"})
rawNode := raw["Node"]
must.Eq(t, n.ID, rawNode["ID"].(string))
must.Eq(t, "", rawNode["SecretID"])
}
case <-time.After(5 * time.Second):
must.Unreachable(t, must.Sprint("failed waiting for event stream event"))
}
}
func TestEventStream_PayloadValueHelpers(t *testing.T) {
testutil.Parallel(t)
testCases := []struct {
desc string
event Event
input []byte
err string
expectFn func(t *testing.T, event Event)
}{
{
desc: "deployment",
input: []byte(`{"Topic": "Deployment", "Payload": {"Deployment":{"ID":"some-id","JobID":"some-job-id", "TaskGroups": {"tg1": {"RequireProgressBy": "2020-11-05T11:52:54.370774000-05:00"}}}}}`),
expectFn: func(t *testing.T, event Event) {
eventTime, err := time.Parse(time.RFC3339, "2020-11-05T11:52:54.370774000-05:00")
must.NoError(t, err)
must.Eq(t, TopicDeployment, event.Topic)
d, err := event.Deployment()
must.NoError(t, err)
must.Eq(t, &Deployment{
ID: "some-id",
JobID: "some-job-id",
TaskGroups: map[string]*DeploymentState{
"tg1": {
RequireProgressBy: eventTime,
},
},
}, d)
},
},
{
desc: "evaluation",
input: []byte(`{"Topic": "Evaluation", "Payload": {"Evaluation":{"ID":"some-id","Namespace":"some-namespace-id"}}}`),
expectFn: func(t *testing.T, event Event) {
must.Eq(t, TopicEvaluation, event.Topic)
eval, err := event.Evaluation()
must.NoError(t, err)
must.Eq(t, &Evaluation{
ID: "some-id",
Namespace: "some-namespace-id",
}, eval)
},
},
{
desc: "allocation",
input: []byte(`{"Topic": "Allocation", "Payload": {"Allocation":{"ID":"some-id","Namespace":"some-namespace-id"}}}`),
expectFn: func(t *testing.T, event Event) {
must.Eq(t, TopicAllocation, event.Topic)
a, err := event.Allocation()
must.NoError(t, err)
must.Eq(t, &Allocation{
ID: "some-id",
Namespace: "some-namespace-id",
}, a)
},
},
{
input: []byte(`{"Topic": "Job", "Payload": {"Job":{"ID":"some-id","Namespace":"some-namespace-id"}}}`),
expectFn: func(t *testing.T, event Event) {
must.Eq(t, TopicJob, event.Topic)
j, err := event.Job()
must.NoError(t, err)
must.Eq(t, &Job{
ID: pointerOf("some-id"),
Namespace: pointerOf("some-namespace-id"),
}, j)
},
},
{
desc: "node",
input: []byte(`{"Topic": "Node", "Payload": {"Node":{"ID":"some-id","Datacenter":"some-dc-id"}}}`),
expectFn: func(t *testing.T, event Event) {
must.Eq(t, TopicNode, event.Topic)
n, err := event.Node()
must.NoError(t, err)
must.Eq(t, &Node{
ID: "some-id",
Datacenter: "some-dc-id",
}, n)
},
},
{
desc: "node_pool",
input: []byte(`{"Topic":"NodePool","Payload":{"NodePool":{"Description":"prod pool","Name":"prod"}}}`),
expectFn: func(t *testing.T, event Event) {
must.Eq(t, TopicNodePool, event.Topic)
n, err := event.NodePool()
must.NoError(t, err)
must.Eq(t, &NodePool{
Name: "prod",
Description: "prod pool",
}, n)
},
},
{
desc: "service",
input: []byte(`{"Topic": "Service", "Payload": {"Service":{"ID":"some-service-id","Namespace":"some-service-namespace-id","Datacenter":"us-east-1a"}}}`),
expectFn: func(t *testing.T, event Event) {
must.Eq(t, TopicService, event.Topic)
a, err := event.Service()
must.NoError(t, err)
must.Eq(t, "us-east-1a", a.Datacenter)
must.Eq(t, "some-service-id", a.ID)
must.Eq(t, "some-service-namespace-id", a.Namespace)
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
var out Event
err := json.Unmarshal(tc.input, &out)
must.NoError(t, err)
tc.expectFn(t, out)
})
}
}