open-nomad/nomad/event_endpoint_test.go

747 lines
19 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nomad
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/go-msgpack/codec"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/mapstructure"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
func TestEventStream(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
// invoke handler
go handler(p2)
// decode request responses
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// retrieve publisher for server, send event
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// Send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(3), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(3 * time.Second)
got := 0
want := 3
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// ignore heartbeat
if bytes.Equal(msg.Event.Data, stream.JsonHeartbeat.Data) {
continue
}
var event structs.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
// decode fully to ensure we received expected out
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
got++
if got == want {
break OUTER
}
}
}
}
// TestEventStream_StreamErr asserts an error is returned when an event publisher
// closes its subscriptions
func TestEventStream_StreamErr(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// close the publishers subscriptions forcing an error
// after an initial event is received
publisher.CloseAll()
if msg.Error == nil {
// continue trying for error
continue
}
require.NotNil(t, msg.Error)
require.Contains(t, msg.Error.Error(), "subscription closed by server")
break OUTER
}
}
}
// TestEventStream_RegionForward tests event streaming from one server
// to another in a different region
func TestEventStream_RegionForward(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
c.Region = "foo"
})
defer cleanupS2()
TestJoin(t, s1, s2)
// Create request targed for region foo
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: "foo",
},
}
// Query s1 handler
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// publish with server 2
publisher, err := s2.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
if bytes.Equal(msg.Event.Data, stream.JsonHeartbeat.Data) {
continue
}
var event structs.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
}
}
}
func TestEventStream_ACL(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
// start server
s, _, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyNsGood := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s.State(), 1006, "valid", policyNsGood)
policyNsNode := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
policyNsNode += "\n" + mock.NodePolicy("read")
tokenNsNode := mock.CreatePolicyAndToken(t, s.State(), 1007, "validnNsNode", policyNsNode)
cases := []struct {
Name string
Token string
Topics map[structs.Topic][]string
Namespace string
ExpectedErr string
PublishFn func(p *stream.EventBroker)
}{
{
Name: "no token",
Token: "",
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "bad token",
Token: tokenBad.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "job namespace token - correct ns",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
structs.TopicEvaluation: {"*"},
structs.TopicAllocation: {"*"},
structs.TopicDeployment: {"*"},
},
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - incorrect ns",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"}, // good
},
Namespace: "bar", // bad
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - request management topic",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"}, // bad
},
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - request invalid node topic",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // bad
},
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job+node namespace token, valid",
Token: tokenNsNode.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // good
},
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Node", Payload: mock.Node()}}})
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
var ns string
if tc.Namespace != "" {
ns = tc.Namespace
}
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: tc.Topics,
QueryOptions: structs.QueryOptions{
Region: s.Region(),
Namespace: ns,
AuthToken: tc.Token,
},
}
handler, err := s.StreamingRpcHandler("Event.Stream")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
publisher, err := s.State().EventBroker()
require.NoError(err)
// publish some events
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
if tc.PublishFn != nil {
tc.PublishFn(publisher)
}
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for events")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// force error by closing all subscriptions
publisher.CloseAll()
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), tc.ExpectedErr) {
break OUTER
} else {
t.Fatalf("unexpected error %v", msg.Error)
}
}
}
})
}
}
// TestEventStream_ACL_Update_Close_Stream asserts that an active subscription
// is closed after the token is no longer valid
func TestEventStream_ACL_Update_Close_Stream(t *testing.T) {
ci.Parallel(t)
// start server
s1, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s1.RPC)
policyNsGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s1.State(), 1006, "valid", policyNsGood)
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"Job": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: tokenNsFoo.SecretID,
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
job := mock.Job()
jobEvent := structs.JobEvent{
Job: job,
}
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
// RPC to delete token
aclDelReq := &structs.ACLTokenDeleteRequest{
AccessorIDs: []string{tokenNsFoo.AccessorID},
WriteRequest: structs.WriteRequest{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: root.SecretID,
},
}
var aclResp structs.GenericResponse
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
codec := rpcClient(t, s1)
errChStream := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errChStream <- ctx.Err()
return
case err := <-errCh:
errChStream <- err
return
case msg := <-streamMsg:
if msg.Error == nil {
// received a valid event, make RPC to delete token
// continue trying for error
continue
}
errChStream <- msg.Error
return
}
}
}()
// Delete the token used to create the stream
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.DeleteTokens", aclDelReq, &aclResp))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case err := <-errChStream:
// Success
require.Contains(t, err.Error(), stream.ErrSubscriptionClosed.Error())
break OUTER
}
}
}
// TestEventStream_ACLTokenExpiry ensure a subscription does not receive events
// and is closed once the token has expired.
func TestEventStream_ACLTokenExpiry(t *testing.T) {
ci.Parallel(t)
// Start our test server and wait until we have a leader.
testServer, _, testServerCleanup := TestACLServer(t, nil)
defer testServerCleanup()
testutil.WaitForLeader(t, testServer.RPC)
// Create and upsert and ACL token which has a short expiry set.
aclTokenWithExpiry := mock.ACLManagementToken()
aclTokenWithExpiry.ExpirationTime = pointer.Of(time.Now().Add(2 * time.Second))
must.NoError(t, testServer.fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{aclTokenWithExpiry}))
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"Job": {"*"}},
QueryOptions: structs.QueryOptions{
Region: testServer.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: aclTokenWithExpiry.SecretID,
},
}
handler, err := testServer.StreamingRpcHandler("Event.Stream")
must.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := testServer.State().EventBroker()
must.NoError(t, err)
jobEvent := structs.JobEvent{
Job: mock.Job(),
}
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))
defer cancel()
errChStream := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errChStream <- ctx.Err()
return
case err := <-errCh:
errChStream <- err
return
case msg := <-streamMsg:
if msg.Error == nil {
continue
}
errChStream <- msg.Error
return
}
}
}()
// Generate a timeout for the test and for the expiry. The expiry timeout
// is used to trigger an update which will close the subscription as the
// event stream only reacts to change in state.
testTimeout := time.After(4 * time.Second)
expiryTimeout := time.After(time.Until(*aclTokenWithExpiry.ExpirationTime))
for {
select {
case <-testTimeout:
t.Fatal("timeout waiting for event stream to close")
case err := <-errCh:
t.Fatal(err)
case <-expiryTimeout:
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
case err := <-errChStream:
// Success
must.StrContains(t, err.Error(), "ACL token expired")
return
}
}
}