open-nomad/nomad/stream/event_broker_test.go
Isabel Suchanek ab51050ce8
events: fix wildcard namespace handling (#10935)
This fixes a bug in the event stream API where it currently interprets
namespace=* as an actual namespace, not a wildcard. When Nomad parses
incoming requests, it sets namespace to default if not specified, which
means the request namespace will never be an empty string, which is what
the event subscription was checking for. This changes the conditional
logic to check for a wildcard namespace instead of an empty one.

It also updates some event tests to include the default namespace in the
subscription to match current behavior.

Fixes #10903
2021-09-02 09:36:55 -07:00

622 lines
18 KiB
Go

package stream
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) {
subscription := &SubscribeRequest{
Topics: map[structs.Topic][]string{
"Test": {"sub-key"},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{EventBufferSize: 100})
require.NoError(t, err)
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []structs.Event{{
Index: 1,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload",
}}
publisher.Publish(&structs.Events{Index: 1, Events: events})
// Subscriber should see the published event
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []structs.Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}}
require.Equal(t, expected, result.Events)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
// Publish a second event
events = []structs.Event{{
Index: 2,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload 2",
}}
publisher.Publish(&structs.Events{Index: 2, Events: events})
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
expected = []structs.Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}}
require.Equal(t, expected, result.Events)
}
func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub1.Unsubscribe()
sub2, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub2.Unsubscribe()
cancel() // Shutdown
err = consumeSub(context.Background(), sub1)
require.Equal(t, err, ErrSubscriptionClosed)
_, err = sub2.Next(context.Background())
require.Equal(t, err, ErrSubscriptionClosed)
}
// TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription
// hanlding behavior when ACLs are disabled (request Token is empty).
// Subscriptions are mapped by their request token. when that token is empty,
// the subscriptions should still be handled indeppendtly of each other when
// unssubscribing.
func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
// first subscription, empty token
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub1.Unsubscribe()
// second subscription, empty token
sub2, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
require.NotNil(t, sub2)
sub1.Unsubscribe()
require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state))
}
func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Token: "foo",
})
require.NoError(t, err)
defer sub1.Unsubscribe()
aclEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}),
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}})
for {
_, err := sub1.Next(ctx)
if err == ErrSubscriptionClosed {
break
}
}
out, err := sub1.Next(ctx)
require.Error(t, err)
require.Equal(t, ErrSubscriptionClosed, err)
require.Equal(t, structs.Events{}, out)
}
type fakeACLDelegate struct {
tokenProvider ACLTokenProvider
}
func (d *fakeACLDelegate) TokenProvider() ACLTokenProvider {
return d.tokenProvider
}
type fakeACLTokenProvider struct {
policy *structs.ACLPolicy
policyErr error
token *structs.ACLToken
tokenErr error
}
func (p *fakeACLTokenProvider) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) {
return p.token, p.tokenErr
}
func (p *fakeACLTokenProvider) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) {
return p.policy, p.policyErr
}
func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
secretID := "some-secret-id"
cases := []struct {
policyBeforeRules string
policyAfterRules string
topics map[structs.Topic][]string
desc string
event structs.Event
policyEvent structs.Event
shouldUnsubscribe bool
initialSubErr bool
}{
{
desc: "subscribed to deployments and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to allocs and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{
Allocation: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and removed access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals in all namespaces and removed access",
policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Namespace: "foo",
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to deployments and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to allocs and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{
Allocation: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and no access change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyRead),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "initial token insufficient privileges",
initialSubErr: true,
policyBeforeRules: mock.NodePolicy(acl.PolicyDeny),
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and policy change no change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyWrite),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes and policy change no access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes policy deleted",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: "",
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyDeleted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
policy := &structs.ACLPolicy{
Name: "some-policy",
Rules: tc.policyBeforeRules,
}
policy.SetHash()
tokenProvider := &fakeACLTokenProvider{
policy: policy,
token: &structs.ACLToken{
SecretID: secretID,
Policies: []string{policy.Name},
},
}
aclDelegate := &fakeACLDelegate{
tokenProvider: tokenProvider,
}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
var ns string
if tc.event.Namespace != "" {
ns = tc.event.Namespace
} else {
ns = structs.DefaultNamespace
}
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"},
},
Namespace: ns,
Token: secretID,
})
if tc.initialSubErr {
require.Error(t, err)
require.Nil(t, sub)
return
} else {
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// Update the mock provider to use the after rules
policyAfter := &structs.ACLPolicy{
Name: "some-new-policy",
Rules: tc.policyAfterRules,
ModifyIndex: 101, // The ModifyIndex is used to caclulate the acl cache key
}
policyAfter.SetHash()
tokenProvider.policy = policyAfter
// Publish ACL event triggering subscription re-evaluation
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
// Publish another event
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldUnsubscribe {
for {
_, err = sub.Next(ctx)
if err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
if tc.shouldUnsubscribe {
require.Equal(t, ErrSubscriptionClosed, err)
} else {
require.NoError(t, err)
}
})
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next(ctx)
eventCh <- subNextResult{
Events: es.Events,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
type subNextResult struct {
Events []structs.Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
t.Helper()
select {
case next := <-eventCh:
return next
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func consumeSub(ctx context.Context, sub *Subscription) error {
for {
_, err := sub.Next(ctx)
if err != nil {
return err
}
}
}