open-nomad/nomad/stream/subscription.go
Isabel Suchanek ab51050ce8
events: fix wildcard namespace handling (#10935)
This fixes a bug in the event stream API where it currently interprets
namespace=* as an actual namespace, not a wildcard. When Nomad parses
incoming requests, it sets namespace to default if not specified, which
means the request namespace will never be an empty string, which is what
the event subscription was checking for. This changes the conditional
logic to check for a wildcard namespace instead of an empty one.

It also updates some event tests to include the default namespace in the
subscription to match current behavior.

Fixes #10903
2021-09-02 09:36:55 -07:00

179 lines
4.3 KiB
Go

package stream
import (
"context"
"errors"
"sync/atomic"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// subscriptionStateOpen is the default state of a subscription. An open
// subscription may receive new events.
subscriptionStateOpen uint32 = 0
// subscriptionStateClosed indicates that the subscription was closed, possibly
// as a result of a change to an ACL token, and will not receive new events.
// The subscriber must issue a new Subscribe request.
subscriptionStateClosed uint32 = 1
)
// ErrSubscriptionClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
var ErrACLInvalid = errors.New("Provided ACL token is invalid for requested topics")
type Subscription struct {
// state must be accessed atomically 0 means open, 1 means closed with reload
state uint32
req *SubscribeRequest
// currentItem stores the current buffer item we are on. It
// is mutated by calls to Next.
currentItem *bufferItem
// forceClosed is closed when forceClose is called. It is used by
// EventBroker to cancel Next().
forceClosed chan struct{}
// unsub is a function set by EventBroker that is called to free resources
// when the subscription is no longer needed.
// It must be safe to call the function from multiple goroutines and the function
// must be idempotent.
unsub func()
}
type SubscribeRequest struct {
Token string
Index uint64
Namespace string
Topics map[structs.Topic][]string
// StartExactlyAtIndex specifies if a subscription needs to
// start exactly at the requested Index. If set to false,
// the closest index in the buffer will be returned if there is not
// an exact match
StartExactlyAtIndex bool
}
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{
forceClosed: make(chan struct{}),
req: req,
currentItem: item,
unsub: unsub,
}
}
func (s *Subscription) Next(ctx context.Context) (structs.Events, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return structs.Events{}, ErrSubscriptionClosed
}
for {
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return structs.Events{}, ErrSubscriptionClosed
case err != nil:
return structs.Events{}, err
}
s.currentItem = next
events := filter(s.req, next.Events.Events)
if len(events) == 0 {
continue
}
return structs.Events{Index: next.Events.Index, Events: events}, nil
}
}
func (s *Subscription) NextNoBlock() ([]structs.Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
}
for {
next := s.currentItem.NextNoBlock()
if next == nil {
return nil, nil
}
s.currentItem = next
events := filter(s.req, next.Events.Events)
if len(events) == 0 {
continue
}
return events, nil
}
}
func (s *Subscription) Unsubscribe() {
s.unsub()
}
// filter events to only those that match a subscriptions topic/keys/namespace
func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {
if len(events) == 0 {
return nil
}
allTopicKeys := req.Topics[structs.TopicAll]
// Return all events if subscribed to all namespaces and all topics
if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
return events
}
var result []structs.Event
for _, event := range events {
if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace {
continue
}
// *[*] always matches
if len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
result = append(result, event)
continue
}
keys := allTopicKeys
if topicKeys, ok := req.Topics[event.Topic]; ok {
keys = append(keys, topicKeys...)
}
if len(keys) == 1 && keys[0] == string(structs.TopicAll) {
result = append(result, event)
continue
}
for _, key := range keys {
if eventMatchesKey(event, key) {
result = append(result, event)
continue
}
}
}
return result
}
func eventMatchesKey(event structs.Event, key string) bool {
if event.Key == key {
return true
}
for _, fk := range event.FilterKeys {
if fk == key {
return true
}
}
return false
}