open-nomad/nomad/event_endpoint_test.go
James Rasell da5069bded event stream: ensure token expiry is correctly checked for subs.
This change ensures that a token's expiry is checked before every
event is sent to the caller. Previously, a token could still be
used to listen for events after it had expired, as long as the
subscription was made while it was unexpired. This would last until
the token was garbage collected from state.

The check occurs within the RPC as there is currently no state
update when a token expires.
2022-10-27 13:08:05 -04:00

744 lines
19 KiB
Go

package nomad
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/go-msgpack/codec"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/mapstructure"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
func TestEventStream(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
// invoke handler
go handler(p2)
// decode request responses
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// retrieve publisher for server, send event
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// Send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(3), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(3 * time.Second)
got := 0
want := 3
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
// ignore heartbeat
if bytes.Equal(msg.Event.Data, stream.JsonHeartbeat.Data) {
continue
}
var event structs.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
// decode fully to ensure we received expected out
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
got++
if got == want {
break OUTER
}
}
}
}
// TestEventStream_StreamErr asserts an error is returned when an event publisher
// closes its subscriptions
func TestEventStream_StreamErr(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// close the publishers subscriptions forcing an error
// after an initial event is received
publisher.CloseAll()
if msg.Error == nil {
// continue trying for error
continue
}
require.NotNil(t, msg.Error)
require.Contains(t, msg.Error.Error(), "subscription closed by server")
break OUTER
}
}
}
// TestEventStream_RegionForward tests event streaming from one server
// to another in a different region
func TestEventStream_RegionForward(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.EnableEventBroker = true
c.Region = "foo"
})
defer cleanupS2()
TestJoin(t, s1, s2)
// Create request targed for region foo
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: "foo",
},
}
// Query s1 handler
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// publish with server 2
publisher, err := s2.State().EventBroker()
require.NoError(t, err)
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
timeout := time.After(3 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
if bytes.Equal(msg.Event.Data, stream.JsonHeartbeat.Data) {
continue
}
var event structs.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)
var out structs.Node
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
}
}
}
func TestEventStream_ACL(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
// start server
s, _, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)
policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyNsGood := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s.State(), 1006, "valid", policyNsGood)
policyNsNode := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob})
policyNsNode += "\n" + mock.NodePolicy("read")
tokenNsNode := mock.CreatePolicyAndToken(t, s.State(), 1007, "validnNsNode", policyNsNode)
cases := []struct {
Name string
Token string
Topics map[structs.Topic][]string
Namespace string
ExpectedErr string
PublishFn func(p *stream.EventBroker)
}{
{
Name: "no token",
Token: "",
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "bad token",
Token: tokenBad.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"},
},
ExpectedErr: structs.ErrPermissionDenied.Error(),
},
{
Name: "job namespace token - correct ns",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
structs.TopicEvaluation: {"*"},
structs.TopicAllocation: {"*"},
structs.TopicDeployment: {"*"},
},
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - incorrect ns",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"}, // good
},
Namespace: "bar", // bad
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - request management topic",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicAll: {"*"}, // bad
},
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job namespace token - request invalid node topic",
Token: tokenNsFoo.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // bad
},
Namespace: "foo",
ExpectedErr: structs.ErrPermissionDenied.Error(),
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}})
},
},
{
Name: "job+node namespace token, valid",
Token: tokenNsNode.SecretID,
Topics: map[structs.Topic][]string{
structs.TopicEvaluation: {"*"}, // good
structs.TopicNode: {"*"}, // good
},
Namespace: "foo",
ExpectedErr: "subscription closed by server",
PublishFn: func(p *stream.EventBroker) {
p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Node", Payload: mock.Node()}}})
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
var ns string
if tc.Namespace != "" {
ns = tc.Namespace
}
// Create request for all topics and keys
req := structs.EventStreamRequest{
Topics: tc.Topics,
QueryOptions: structs.QueryOptions{
Region: s.Region(),
Namespace: ns,
AuthToken: tc.Token,
},
}
handler, err := s.StreamingRpcHandler("Event.Stream")
require.Nil(err)
// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
publisher, err := s.State().EventBroker()
require.NoError(err)
// publish some events
node := mock.Node()
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}})
if tc.PublishFn != nil {
tc.PublishFn(publisher)
}
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for events")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
// force error by closing all subscriptions
publisher.CloseAll()
if msg.Error == nil {
continue
}
if strings.Contains(msg.Error.Error(), tc.ExpectedErr) {
break OUTER
} else {
t.Fatalf("unexpected error %v", msg.Error)
}
}
}
})
}
}
// TestEventStream_ACL_Update_Close_Stream asserts that an active subscription
// is closed after the token is no longer valid
func TestEventStream_ACL_Update_Close_Stream(t *testing.T) {
ci.Parallel(t)
// start server
s1, root, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s1.RPC)
policyNsGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})
tokenNsFoo := mock.CreatePolicyAndToken(t, s1.State(), 1006, "valid", policyNsGood)
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"Job": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: tokenNsFoo.SecretID,
},
}
handler, err := s1.StreamingRpcHandler("Event.Stream")
require.Nil(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := s1.State().EventBroker()
require.NoError(t, err)
job := mock.Job()
jobEvent := structs.JobEvent{
Job: job,
}
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
// RPC to delete token
aclDelReq := &structs.ACLTokenDeleteRequest{
AccessorIDs: []string{tokenNsFoo.AccessorID},
WriteRequest: structs.WriteRequest{
Region: s1.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: root.SecretID,
},
}
var aclResp structs.GenericResponse
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
codec := rpcClient(t, s1)
errChStream := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errChStream <- ctx.Err()
return
case err := <-errCh:
errChStream <- err
return
case msg := <-streamMsg:
if msg.Error == nil {
// received a valid event, make RPC to delete token
// continue trying for error
continue
}
errChStream <- msg.Error
return
}
}
}()
// Delete the token used to create the stream
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.DeleteTokens", aclDelReq, &aclResp))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for event stream")
case err := <-errCh:
t.Fatal(err)
case err := <-errChStream:
// Success
require.Contains(t, err.Error(), stream.ErrSubscriptionClosed.Error())
break OUTER
}
}
}
// TestEventStream_ACLTokenExpiry ensure a subscription does not receive events
// and is closed once the token has expired.
func TestEventStream_ACLTokenExpiry(t *testing.T) {
ci.Parallel(t)
// Start our test server and wait until we have a leader.
testServer, _, testServerCleanup := TestACLServer(t, nil)
defer testServerCleanup()
testutil.WaitForLeader(t, testServer.RPC)
// Create and upsert and ACL token which has a short expiry set.
aclTokenWithExpiry := mock.ACLManagementToken()
aclTokenWithExpiry.ExpirationTime = pointer.Of(time.Now().Add(2 * time.Second))
must.NoError(t, testServer.fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{aclTokenWithExpiry}))
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"Job": {"*"}},
QueryOptions: structs.QueryOptions{
Region: testServer.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: aclTokenWithExpiry.SecretID,
},
}
handler, err := testServer.StreamingRpcHandler("Event.Stream")
must.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := testServer.State().EventBroker()
must.NoError(t, err)
jobEvent := structs.JobEvent{
Job: mock.Job(),
}
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))
defer cancel()
errChStream := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errChStream <- ctx.Err()
return
case err := <-errCh:
errChStream <- err
return
case msg := <-streamMsg:
if msg.Error == nil {
continue
}
errChStream <- msg.Error
return
}
}
}()
// Generate a timeout for the test and for the expiry. The expiry timeout
// is used to trigger an update which will close the subscription as the
// event stream only reacts to change in state.
testTimeout := time.After(4 * time.Second)
expiryTimeout := time.After(time.Until(*aclTokenWithExpiry.ExpirationTime))
for {
select {
case <-testTimeout:
t.Fatal("timeout waiting for event stream to close")
case err := <-errCh:
t.Fatal(err)
case <-expiryTimeout:
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
case err := <-errChStream:
// Success
must.StrContains(t, err.Error(), "ACL token expired")
return
}
}
}