open-nomad/nomad/event_endpoint_test.go
James Rasell db4e2541bd
events: fix event endpoint tests to ignore heartbeats.
seems when this PR was raised, the Nomad CI provider was having
availability issues meaning the test suite was not correctly run,
thus allowing broken tests into main. The PR itself exercised test
code which had not been hit before.

The particular problem is when identifying whether the event
received is a heartbeat; this was performed using standard Golang
conditionals. Unfortunately the operator == is not defined on byte
arrays, resulting in the check always returning false. To overcome
this issue the code now uses the bytes.Equal function to correctly
compare the data.
2021-05-24 10:28:19 +02:00

627 lines
16 KiB
Go

package nomad
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/go-msgpack/codec"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/require"
)
func TestEventStream(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
// invoke handler
go handler(p2)
// decode request responses
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// retrieve publisher for server, send event
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// Send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(3), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(3 * time.Second)
got := 0
want := 3
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// ignore heartbeat
if bytes.Equal(msg.Event.Data, stream.JsonHeartbeat.Data) {
continue
}
var event structs.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
// decode fully to ensure we received expected out
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
got++
if got == want {
break OUTER
}
}
}
}
// TestEventStream_StreamErr asserts an error is returned when an event publisher
// closes its subscriptions
func TestEventStream_StreamErr(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// close the publishers subscriptions forcing an error
// after an initial event is received
publisher.CloseAll()
if msg.Error == nil {
// continue trying for error
continue
}
require.NotNil(t, msg.Error)
require.Contains(t, msg.Error.Error(), "subscription closed by server")
break OUTER
}
}
}
// TestEventStream_RegionForward tests event streaming from one server
// to another in a different region
func TestEventStream_RegionForward(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
c.Region = "foo"
})
defer cleanupS2()
TestJoin(t, s1, s2)
// Create request targed for region foo
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: "foo",
},
}
// Query s1 handler
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// publish with server 2
publisher, err := s2.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
if bytes.Equal(msg.Event.Data, stream.JsonHeartbeat.Data) {
continue
}
var event structs.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
}
}
}
func TestEventStream_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server
s, _, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyNsGood := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s.State(), 1006, "valid", policyNsGood)
policyNsNode := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
policyNsNode += "\n" + mock.NodePolicy("read")
tokenNsNode := mock.CreatePolicyAndToken(t, s.State(), 1007, "validnNsNode", policyNsNode)
cases := []struct {
Name string
Token string
Topics map[structs.Topic][]string
Namespace string
ExpectedErr string
PublishFn func(p *stream.EventBroker)
}{
{
Name: "no token",
Token: "",
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "bad token",
Token: tokenBad.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "job namespace token - correct ns",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
structs.TopicEvaluation: {"*"},
structs.TopicAllocation: {"*"},
structs.TopicDeployment: {"*"},
},
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - incorrect ns",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"}, // good
},
Namespace: "bar", // bad
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - request management topic",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"}, // bad
},
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - request invalid node topic",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // bad
},
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job+node namespace token, valid",
Token: tokenNsNode.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // good
},
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Node", Payload: mock.Node()}}})
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
var ns string
if tc.Namespace != "" {
ns = tc.Namespace
}
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: tc.Topics,
QueryOptions: structs.QueryOptions{
Region: s.Region(),
Namespace: ns,
AuthToken: tc.Token,
},
}
handler, err := s.StreamingRpcHandler("Event.Stream")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
publisher, err := s.State().EventBroker()
require.NoError(err)
// publish some events
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
if tc.PublishFn != nil {
tc.PublishFn(publisher)
}
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for events")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// force error by closing all subscriptions
publisher.CloseAll()
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), tc.ExpectedErr) {
break OUTER
} else {
t.Fatalf("unexpected error %v", msg.Error)
}
}
}
})
}
}
// TestEventStream_ACL_Update_Close_Stream asserts that an active subscription
// is closed after the token is no longer valid
func TestEventStream_ACL_Update_Close_Stream(t *testing.T) {
t.Parallel()
// start server
s1, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s1.RPC)
policyNsGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s1.State(), 1006, "valid", policyNsGood)
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"Job": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: tokenNsFoo.SecretID,
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
job := mock.Job()
jobEvent := structs.JobEvent{
Job: job,
}
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
// RPC to delete token
aclDelReq := &structs.ACLTokenDeleteRequest{
AccessorIDs: []string{tokenNsFoo.AccessorID},
WriteRequest: structs.WriteRequest{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: root.SecretID,
},
}
var aclResp structs.GenericResponse
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
codec := rpcClient(t, s1)
errChStream := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errChStream <- ctx.Err()
return
case err := <-errCh:
errChStream <- err
return
case msg := <-streamMsg:
if msg.Error == nil {
// received a valid event, make RPC to delete token
// continue trying for error
continue
}
errChStream <- msg.Error
return
}
}
}()
// Delete the token used to create the stream
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.DeleteTokens", aclDelReq, &aclResp))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case err := <-errChStream:
// Success
require.Contains(t, err.Error(), stream.ErrSubscriptionClosed.Error())
break OUTER
}
}
}