178 lines
4.3 KiB
Go
178 lines
4.3 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync/atomic"
|
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
const (
|
|
// subscriptionStateOpen is the default state of a subscription. An open
|
|
// subscription may receive new events.
|
|
subscriptionStateOpen uint32 = 0
|
|
|
|
// subscriptionStateClosed indicates that the subscription was closed, possibly
|
|
// as a result of a change to an ACL token, and will not receive new events.
|
|
// The subscriber must issue a new Subscribe request.
|
|
subscriptionStateClosed uint32 = 1
|
|
)
|
|
|
|
// ErrSubscriptionClosed is a error signalling the subscription has been
|
|
// closed. The client should Unsubscribe, then re-Subscribe.
|
|
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
|
|
var ErrACLInvalid = errors.New("Provided ACL token is invalid for requested topics")
|
|
|
|
type Subscription struct {
|
|
// state must be accessed atomically 0 means open, 1 means closed with reload
|
|
state uint32
|
|
|
|
req *SubscribeRequest
|
|
|
|
// currentItem stores the current buffer item we are on. It
|
|
// is mutated by calls to Next.
|
|
currentItem *bufferItem
|
|
|
|
// forceClosed is closed when forceClose is called. It is used by
|
|
// EventBroker to cancel Next().
|
|
forceClosed chan struct{}
|
|
|
|
// unsub is a function set by EventBroker that is called to free resources
|
|
// when the subscription is no longer needed.
|
|
// It must be safe to call the function from multiple goroutines and the function
|
|
// must be idempotent.
|
|
unsub func()
|
|
}
|
|
|
|
type SubscribeRequest struct {
|
|
Token string
|
|
Index uint64
|
|
Namespace string
|
|
|
|
Topics map[structs.Topic][]string
|
|
|
|
// StartExactlyAtIndex specifies if a subscription needs to
|
|
// start exactly at the requested Index. If set to false,
|
|
// the closest index in the buffer will be returned if there is not
|
|
// an exact match
|
|
StartExactlyAtIndex bool
|
|
}
|
|
|
|
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
|
|
return &Subscription{
|
|
forceClosed: make(chan struct{}),
|
|
req: req,
|
|
currentItem: item,
|
|
unsub: unsub,
|
|
}
|
|
}
|
|
|
|
func (s *Subscription) Next(ctx context.Context) (structs.Events, error) {
|
|
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
|
|
return structs.Events{}, ErrSubscriptionClosed
|
|
}
|
|
|
|
for {
|
|
next, err := s.currentItem.Next(ctx, s.forceClosed)
|
|
switch {
|
|
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
|
|
return structs.Events{}, ErrSubscriptionClosed
|
|
case err != nil:
|
|
return structs.Events{}, err
|
|
}
|
|
s.currentItem = next
|
|
|
|
events := filter(s.req, next.Events.Events)
|
|
if len(events) == 0 {
|
|
continue
|
|
}
|
|
return structs.Events{Index: next.Events.Index, Events: events}, nil
|
|
}
|
|
}
|
|
|
|
func (s *Subscription) NextNoBlock() ([]structs.Event, error) {
|
|
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
|
|
return nil, ErrSubscriptionClosed
|
|
}
|
|
|
|
for {
|
|
next := s.currentItem.NextNoBlock()
|
|
if next == nil {
|
|
return nil, nil
|
|
}
|
|
s.currentItem = next
|
|
|
|
events := filter(s.req, next.Events.Events)
|
|
if len(events) == 0 {
|
|
continue
|
|
}
|
|
return events, nil
|
|
}
|
|
}
|
|
|
|
func (s *Subscription) Unsubscribe() {
|
|
s.unsub()
|
|
}
|
|
|
|
// filter events to only those that match a subscriptions topic/keys/namespace
|
|
func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {
|
|
if len(events) == 0 {
|
|
return nil
|
|
}
|
|
|
|
allTopicKeys := req.Topics[structs.TopicAll]
|
|
|
|
if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
|
|
return events
|
|
}
|
|
|
|
var result []structs.Event
|
|
|
|
for _, event := range events {
|
|
if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace {
|
|
continue
|
|
}
|
|
|
|
// *[*] always matches
|
|
if len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
|
|
result = append(result, event)
|
|
continue
|
|
}
|
|
|
|
keys := allTopicKeys
|
|
|
|
if topicKeys, ok := req.Topics[event.Topic]; ok {
|
|
keys = append(keys, topicKeys...)
|
|
}
|
|
|
|
if len(keys) == 1 && keys[0] == string(structs.TopicAll) {
|
|
result = append(result, event)
|
|
continue
|
|
}
|
|
|
|
for _, key := range keys {
|
|
if eventMatchesKey(event, key) {
|
|
result = append(result, event)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func eventMatchesKey(event structs.Event, key string) bool {
|
|
if event.Key == key {
|
|
return true
|
|
}
|
|
|
|
for _, fk := range event.FilterKeys {
|
|
if fk == key {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|