events: fix wildcard namespace handling (#10935)

This fixes a bug in the event stream API where it currently interprets
namespace=* as an actual namespace, not a wildcard. When Nomad parses
incoming requests, it sets namespace to default if not specified, which
means the request namespace will never be an empty string, which is what
the event subscription was checking for. This changes the conditional
logic to check for a wildcard namespace instead of an empty one.

It also updates some event tests to include the default namespace in the
subscription to match current behavior.

Fixes #10903
This commit is contained in:
Isabel Suchanek 2021-09-02 09:36:55 -07:00 committed by GitHub
parent 450d0cb872
commit ab51050ce8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 5 deletions

3
.changelog/10935.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
events: Fixed wildcard namespace handling
```

View File

@ -3378,6 +3378,7 @@ func TestFSM_ACLEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
tc.reqTopic: {"*"},
},
Namespace: "default",
}
sub, err := broker.Subscribe(subReq)
@ -3431,6 +3432,7 @@ func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
},
Namespace: "default",
}
sub, err := broker.Subscribe(subReq)

View File

@ -93,6 +93,7 @@ func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event {
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Namespace: "default",
Index: index,
StartExactlyAtIndex: true,
})

View File

@ -270,6 +270,27 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals in all namespaces and removed access",
policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Namespace: "foo",
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to deployments and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
@ -467,11 +488,18 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
var ns string
if tc.event.Namespace != "" {
ns = tc.event.Namespace
} else {
ns = structs.DefaultNamespace
}
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"},
},
Namespace: structs.DefaultNamespace,
Namespace: ns,
Token: secretID,
})

View File

@ -123,14 +123,15 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {
allTopicKeys := req.Topics[structs.TopicAll]
if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
// Return all events if subscribed to all namespaces and all topics
if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
return events
}
var result []structs.Event
for _, event := range events {
if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace {
if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace {
continue
}

View File

@ -147,6 +147,29 @@ func TestFilter_Namespace(t *testing.T) {
require.Equal(t, 2, cap(actual))
}
func TestFilter_NamespaceAll(t *testing.T) {
events := make([]structs.Event, 0, 5)
events = append(events,
structs.Event{Topic: "Test", Key: "One", Namespace: "foo"},
structs.Event{Topic: "Test", Key: "Two", Namespace: "bar"},
structs.Event{Topic: "Test", Key: "Three", Namespace: "default"},
)
req := &SubscribeRequest{
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Namespace: "*",
}
actual := filter(req, events)
expected := []structs.Event{
{Topic: "Test", Key: "One", Namespace: "foo"},
{Topic: "Test", Key: "Two", Namespace: "bar"},
{Topic: "Test", Key: "Three", Namespace: "default"},
}
require.Equal(t, expected, actual)
}
func TestFilter_FilterKeys(t *testing.T) {
events := make([]structs.Event, 0, 5)
events = append(events, structs.Event{Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Test", Key: "Two"})

View File

@ -47,7 +47,8 @@ by default, requiring a management token.
- `namespace` `(string: "default")` - Specifies the target namespace to filter
on. Specifying `*` includes all namespaces for event types that support
namespaces.
namespaces. If you specify all namespaces (`*`) you'll either need a management
token, or an ACL Policy that explicitly applies to all namespaces (`*`).
- `topic` `(topic:filter_key: "*:*")` - Specifies a topic to subscribe to and
filter on. The default is to subscribe to all topics. Multiple topics may be
@ -100,10 +101,15 @@ by default, requiring a management token.
### Sample Request
```shell-session
# Subscribe to all events and topics
# Subscribe to all events and topics in the default namespace
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream
```
```shell-session
# Subscribe to all events and topics in all namespaces
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?namespace=*
```
```shell-session
# Start at index 100 and subscribe to all Evaluation events
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?index=100&topic=Evaluation