open-consul/agent/consul/stream/event.go

145 lines
3.9 KiB
Go
Raw Normal View History

/*
Package stream provides a publish/subscribe system for events produced by changes
to the state store.
*/
package stream
import (
"fmt"
"github.com/hashicorp/consul/acl"
)
// Topic is an identifier that partitions events. A subscription will only receive
// events which match the Topic.
type Topic fmt.Stringer
// Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers.
type Event struct {
Topic Topic
Index uint64
Payload Payload
}
type Payload interface {
2020-11-06 00:18:37 +00:00
// MatchesKey must return true if the Payload should be included in a subscription
// requested with the key and namespace.
// Generally this means that the payload matches the key and namespace or
// the payload is a special framing event that should be returned to every
// subscription.
2020-11-06 00:18:37 +00:00
MatchesKey(key, namespace string) bool
// HasReadPermission uses the acl.Authorizer to determine if the items in the
// Payload are visible to the request. It returns true if the payload is
// authorized for Read, otherwise returns false.
HasReadPermission(authz acl.Authorizer) bool
}
// PayloadEvents is an Payload which contains multiple Events.
type PayloadEvents struct {
Items []Event
}
func NewPayloadEvents(items ...Event) *PayloadEvents {
return &PayloadEvents{Items: items}
}
func (p *PayloadEvents) filter(f func(Event) bool) bool {
items := p.Items
// To avoid extra allocations, iterate over the list of events first and
// get a count of the total desired size. This trades off some extra cpu
// time in the worse case (when not all items match the filter), for
// fewer memory allocations.
var size int
for idx := range items {
if f(items[idx]) {
size++
}
}
if len(items) == size || size == 0 {
return size != 0
}
filtered := make([]Event, 0, size)
for idx := range items {
event := items[idx]
if f(event) {
filtered = append(filtered, event)
}
}
p.Items = filtered
return true
}
2020-11-06 00:18:37 +00:00
func (p *PayloadEvents) MatchesKey(key, namespace string) bool {
return p.filter(func(event Event) bool {
2020-11-06 00:18:37 +00:00
return event.Payload.MatchesKey(key, namespace)
})
}
func (p *PayloadEvents) Len() int {
return len(p.Items)
}
func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool {
return p.filter(func(event Event) bool {
return event.Payload.HasReadPermission(authz)
})
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Subsequent events from Subscription.Next will be
// streamed as they occur.
func (e Event) IsEndOfSnapshot() bool {
return e.Payload == endOfSnapshot{}
}
// IsNewSnapshotToFollow returns true if this is a framing event that indicates
// that the clients view is stale, and must be reset. Subsequent events from
// Subscription.Next will be a new snapshot, followed by an EndOfSnapshot event.
func (e Event) IsNewSnapshotToFollow() bool {
return e.Payload == newSnapshotToFollow{}
}
type framingEvent struct{}
2020-11-06 00:18:37 +00:00
func (framingEvent) MatchesKey(string, string) bool {
return true
}
func (framingEvent) HasReadPermission(acl.Authorizer) bool {
return true
}
type endOfSnapshot struct {
framingEvent
}
type newSnapshotToFollow struct {
framingEvent
}
type closeSubscriptionPayload struct {
tokensSecretIDs []string
}
2020-11-06 00:18:37 +00:00
func (closeSubscriptionPayload) MatchesKey(string, string) bool {
return false
}
func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool {
return false
}
// NewCloseSubscriptionEvent returns a special Event that is handled by the
// stream package, and is never sent to subscribers. EventProcessor handles
// these events, and closes any subscriptions which were created using a token
// which matches any of the tokenSecretIDs.
//
// tokenSecretIDs may contain duplicate IDs.
func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event {
return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}}
}