2020-07-06 20:15:13 +00:00
|
|
|
package stream
|
2020-06-02 22:37:10 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
2020-07-08 04:31:22 +00:00
|
|
|
// EventPublisher receives change events from Publish, and sends the events to
|
|
|
|
// all subscribers of the event Topic.
|
2020-06-02 22:37:10 +00:00
|
|
|
type EventPublisher struct {
|
|
|
|
// snapCacheTTL controls how long we keep snapshots in our cache before
|
|
|
|
// allowing them to be garbage collected and a new one made for subsequent
|
|
|
|
// requests for that topic and key. In general this should be pretty short to
|
|
|
|
// keep memory overhead of duplicated event data low - snapshots are typically
|
|
|
|
// not that expensive, but having a cache for a few seconds can help
|
|
|
|
// de-duplicate building the same snapshot over and over again when a
|
|
|
|
// thundering herd of watchers all subscribe to the same topic within a few
|
2020-06-15 20:59:09 +00:00
|
|
|
// seconds.
|
2020-06-02 22:37:10 +00:00
|
|
|
snapCacheTTL time.Duration
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
// This lock protects the snapCache, topicBuffers and topicBuffer.refs.
|
2020-06-02 22:37:10 +00:00
|
|
|
lock sync.RWMutex
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
// topicBuffers stores the head of the linked-list buffers to publish events to
|
2020-06-02 22:37:10 +00:00
|
|
|
// for a topic.
|
2022-01-28 12:27:00 +00:00
|
|
|
topicBuffers map[topicSubject]*topicBuffer
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
// snapCache if a cache of EventSnapshots indexed by topic and subject.
|
2020-07-08 18:45:18 +00:00
|
|
|
// TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
|
2022-01-28 12:27:00 +00:00
|
|
|
snapCache map[topicSubject]*eventSnapshot
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2020-06-18 22:11:42 +00:00
|
|
|
subscriptions *subscriptions
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2020-06-15 20:59:09 +00:00
|
|
|
// publishCh is used to send messages from an active txn to a goroutine which
|
|
|
|
// publishes events, so that publishing can happen asynchronously from
|
|
|
|
// the Commit call in the FSM hot path.
|
2020-10-01 20:34:00 +00:00
|
|
|
publishCh chan []Event
|
2020-06-17 22:15:45 +00:00
|
|
|
|
2020-07-06 22:44:51 +00:00
|
|
|
snapshotHandlers SnapshotHandlers
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
|
|
|
|
// wildcards contains map keys used to access the buffer for a topic's wildcard
|
|
|
|
// subject — it is used to track which topics support wildcard subscriptions.
|
|
|
|
wildcards map[Topic]topicSubject
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
// topicSubject is used as a map key when accessing topic buffers and cached
|
|
|
|
// snapshots.
|
|
|
|
type topicSubject struct {
|
2022-04-05 14:26:14 +00:00
|
|
|
Topic string
|
|
|
|
Subject string
|
2022-01-28 12:27:00 +00:00
|
|
|
}
|
|
|
|
|
2020-06-18 22:11:42 +00:00
|
|
|
type subscriptions struct {
|
2020-06-18 22:29:06 +00:00
|
|
|
// lock for byToken. If both subscription.lock and EventPublisher.lock need
|
|
|
|
// to be held, EventPublisher.lock MUST always be acquired first.
|
2020-06-18 22:11:42 +00:00
|
|
|
lock sync.RWMutex
|
|
|
|
|
2020-06-18 22:29:06 +00:00
|
|
|
// byToken is an mapping of active Subscriptions indexed by a the token and
|
|
|
|
// a pointer to the request.
|
|
|
|
// When the token is modified all subscriptions under that token will be
|
|
|
|
// reloaded.
|
|
|
|
// A subscription may be unsubscribed by using the pointer to the request.
|
2020-07-06 20:15:13 +00:00
|
|
|
byToken map[string]map[*SubscribeRequest]*Subscription
|
2020-06-18 22:11:42 +00:00
|
|
|
}
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
// topicBuffer augments the eventBuffer with a reference counter, enabling
|
|
|
|
// clean up of unused buffers once there are no longer any subscribers for
|
|
|
|
// the given topic and key.
|
|
|
|
type topicBuffer struct {
|
|
|
|
refs int // refs is guarded by EventPublisher.lock.
|
|
|
|
buf *eventBuffer
|
|
|
|
}
|
|
|
|
|
2020-07-06 22:44:51 +00:00
|
|
|
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
|
|
|
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
2020-07-08 04:31:22 +00:00
|
|
|
// The nil Topic is reserved and should not be used.
|
2020-07-14 23:23:44 +00:00
|
|
|
type SnapshotHandlers map[Topic]SnapshotFunc
|
|
|
|
|
|
|
|
// SnapshotFunc builds a snapshot for the subscription request, and appends the
|
|
|
|
// events to the Snapshot using SnapshotAppender.
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
//
|
|
|
|
// Note: index MUST NOT be zero if any events were appended.
|
2020-07-14 23:23:44 +00:00
|
|
|
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
2020-07-06 18:24:30 +00:00
|
|
|
|
2020-07-06 21:29:45 +00:00
|
|
|
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
|
|
|
type SnapshotAppender interface {
|
2020-07-08 18:45:18 +00:00
|
|
|
// Append events to the snapshot. Every event in the slice must have the same
|
|
|
|
// Index, indicating that it is part of the same raft transaction.
|
2020-07-06 21:29:45 +00:00
|
|
|
Append(events []Event)
|
|
|
|
}
|
|
|
|
|
2020-06-19 16:26:36 +00:00
|
|
|
// NewEventPublisher returns an EventPublisher for publishing change events.
|
|
|
|
// Handlers are used to convert the memDB changes into events.
|
|
|
|
// A goroutine is run in the background to publish events to all subscribes.
|
|
|
|
// Cancelling the context will shutdown the goroutine, to free resources,
|
|
|
|
// and stop all publishing.
|
2022-04-12 13:47:42 +00:00
|
|
|
func NewEventPublisher(snapCacheTTL time.Duration) *EventPublisher {
|
2020-06-02 22:37:10 +00:00
|
|
|
e := &EventPublisher{
|
2020-06-17 22:15:45 +00:00
|
|
|
snapCacheTTL: snapCacheTTL,
|
2022-01-28 12:27:00 +00:00
|
|
|
topicBuffers: make(map[topicSubject]*topicBuffer),
|
|
|
|
snapCache: make(map[topicSubject]*eventSnapshot),
|
2020-10-01 20:34:00 +00:00
|
|
|
publishCh: make(chan []Event, 64),
|
2020-06-18 22:11:42 +00:00
|
|
|
subscriptions: &subscriptions{
|
2020-07-06 20:15:13 +00:00
|
|
|
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
2020-06-18 22:11:42 +00:00
|
|
|
},
|
2022-04-12 13:47:42 +00:00
|
|
|
snapshotHandlers: make(map[Topic]SnapshotFunc),
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
wildcards: make(map[Topic]topicSubject),
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
2022-04-12 13:47:42 +00:00
|
|
|
// RegisterHandler will register a new snapshot handler function. The expectation is
|
|
|
|
// that all handlers get registered prior to the event publisher being Run. Handler
|
|
|
|
// registration is therefore not concurrency safe and access to handlers is internally
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
// not synchronized. Passing supportsWildcard allows consumers to subscribe to events
|
|
|
|
// on this topic with *any* subject (by requesting SubjectWildcard) but this must be
|
|
|
|
// supported by the handler function.
|
|
|
|
func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc, supportsWildcard bool) error {
|
2022-04-12 13:47:42 +00:00
|
|
|
if topic.String() == "" {
|
|
|
|
return fmt.Errorf("the topic cannnot be empty")
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, found := e.snapshotHandlers[topic]; found {
|
|
|
|
return fmt.Errorf("a handler is already registered for the topic: %s", topic.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
e.snapshotHandlers[topic] = handler
|
|
|
|
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
if supportsWildcard {
|
|
|
|
e.wildcards[topic] = topicSubject{
|
|
|
|
Topic: topic.String(),
|
|
|
|
Subject: SubjectWildcard.String(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-12 13:47:42 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *EventPublisher) RefreshTopic(topic Topic) error {
|
|
|
|
if _, found := e.snapshotHandlers[topic]; !found {
|
|
|
|
return fmt.Errorf("topic %s is not registered", topic)
|
|
|
|
}
|
|
|
|
|
|
|
|
e.forceEvictByTopic(topic)
|
|
|
|
e.subscriptions.closeAllByTopic(topic)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-11-06 18:00:33 +00:00
|
|
|
// Publish events to all subscribers of the event Topic. The events will be shared
|
|
|
|
// with all subscriptions, so the Payload used in Event.Payload must be immutable.
|
2020-07-08 04:31:22 +00:00
|
|
|
func (e *EventPublisher) Publish(events []Event) {
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
if len(events) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for idx, event := range events {
|
|
|
|
if _, ok := event.Payload.(closeSubscriptionPayload); ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if event.Payload.Subject() == SubjectWildcard {
|
|
|
|
panic(fmt.Sprintf("SubjectWildcard can only be used for subscription, not for publishing (topic: %s, index: %d)", event.Topic, idx))
|
|
|
|
}
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
|
|
|
|
e.publishCh <- events
|
2020-06-18 22:11:42 +00:00
|
|
|
}
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2020-09-09 20:26:11 +00:00
|
|
|
// Run the event publisher until ctx is cancelled. Run should be called from a
|
|
|
|
// goroutine to forward events from Publish to all the appropriate subscribers.
|
|
|
|
func (e *EventPublisher) Run(ctx context.Context) {
|
2020-06-18 22:11:42 +00:00
|
|
|
for {
|
2020-06-19 16:26:36 +00:00
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
2020-07-16 19:26:26 +00:00
|
|
|
e.subscriptions.closeAll()
|
2020-06-19 16:26:36 +00:00
|
|
|
return
|
|
|
|
case update := <-e.publishCh:
|
2020-10-01 20:34:00 +00:00
|
|
|
e.publishEvent(update)
|
2020-06-19 16:26:36 +00:00
|
|
|
}
|
2020-06-18 22:11:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-01 20:34:00 +00:00
|
|
|
// publishEvent appends the events to any applicable topic buffers. It handles
|
|
|
|
// any closeSubscriptionPayload events by closing associated subscriptions.
|
|
|
|
func (e *EventPublisher) publishEvent(events []Event) {
|
2022-01-28 12:27:00 +00:00
|
|
|
groupedEvents := make(map[topicSubject][]Event)
|
2020-10-01 20:34:00 +00:00
|
|
|
for _, event := range events {
|
2020-07-06 21:29:45 +00:00
|
|
|
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
|
|
|
|
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
|
2020-07-08 18:45:18 +00:00
|
|
|
continue
|
2020-07-06 18:34:58 +00:00
|
|
|
}
|
|
|
|
|
2022-04-05 14:26:14 +00:00
|
|
|
groupKey := topicSubject{
|
|
|
|
Topic: event.Topic.String(),
|
|
|
|
Subject: event.Payload.Subject().String(),
|
|
|
|
}
|
2022-01-28 12:27:00 +00:00
|
|
|
groupedEvents[groupKey] = append(groupedEvents[groupKey], event)
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
|
|
|
|
// If the topic supports wildcard subscribers, copy the events to a wildcard
|
|
|
|
// buffer too.
|
|
|
|
e.lock.Lock()
|
|
|
|
wildcard, ok := e.wildcards[event.Topic]
|
|
|
|
e.lock.Unlock()
|
|
|
|
if ok {
|
|
|
|
groupedEvents[wildcard] = append(groupedEvents[wildcard], event)
|
|
|
|
}
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
|
2020-06-18 22:11:42 +00:00
|
|
|
e.lock.Lock()
|
|
|
|
defer e.lock.Unlock()
|
2022-01-28 12:27:00 +00:00
|
|
|
for groupKey, events := range groupedEvents {
|
|
|
|
// Note: bufferForPublishing returns nil if there are no subscribers for the
|
|
|
|
// given topic and subject, in which case events will be dropped on the floor and
|
|
|
|
// future subscribers will catch up by consuming the snapshot.
|
|
|
|
if buf := e.bufferForPublishing(groupKey); buf != nil {
|
|
|
|
buf.Append(events)
|
|
|
|
}
|
2020-06-15 20:59:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
// bufferForSubscription returns the topic event buffer to which events for the
|
|
|
|
// given topic and key will be appended. If no such buffer exists, a new buffer
|
|
|
|
// will be created.
|
2020-06-15 20:59:09 +00:00
|
|
|
//
|
2022-01-28 12:27:00 +00:00
|
|
|
// Warning: e.lock MUST be held when calling this function.
|
|
|
|
func (e *EventPublisher) bufferForSubscription(key topicSubject) *topicBuffer {
|
|
|
|
buf, ok := e.topicBuffers[key]
|
2020-06-15 20:59:09 +00:00
|
|
|
if !ok {
|
2022-01-28 12:27:00 +00:00
|
|
|
buf = &topicBuffer{
|
|
|
|
buf: newEventBuffer(),
|
|
|
|
}
|
|
|
|
e.topicBuffers[key] = buf
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
2022-01-28 12:27:00 +00:00
|
|
|
|
2020-06-15 20:59:09 +00:00
|
|
|
return buf
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
// bufferForPublishing returns the event buffer to which events for the given
|
|
|
|
// topic and key should be appended. nil will be returned if there are no
|
|
|
|
// subscribers for the given topic and key.
|
|
|
|
//
|
|
|
|
// Warning: e.lock MUST be held when calling this function.
|
|
|
|
func (e *EventPublisher) bufferForPublishing(key topicSubject) *eventBuffer {
|
|
|
|
buf, ok := e.topicBuffers[key]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return buf.buf
|
|
|
|
}
|
|
|
|
|
2020-07-08 04:31:22 +00:00
|
|
|
// Subscribe returns a new Subscription for the given request. A subscription
|
|
|
|
// will receive an initial snapshot of events matching the request if req.Index > 0.
|
|
|
|
// After the snapshot, events will be streamed as they are created.
|
|
|
|
// Subscriptions may be closed, forcing the client to resubscribe (for example if
|
|
|
|
// ACL policies changed or the state store is abandoned).
|
2020-06-02 22:37:10 +00:00
|
|
|
//
|
2020-06-18 22:29:06 +00:00
|
|
|
// When the caller is finished with the subscription for any reason, it must
|
|
|
|
// call Subscription.Unsubscribe to free ACL tracking resources.
|
2020-07-08 04:31:22 +00:00
|
|
|
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
|
2022-04-12 13:47:42 +00:00
|
|
|
e.lock.Lock()
|
|
|
|
defer e.lock.Unlock()
|
|
|
|
|
2020-10-01 17:51:55 +00:00
|
|
|
handler, ok := e.snapshotHandlers[req.Topic]
|
2020-07-08 04:31:22 +00:00
|
|
|
if !ok || req.Topic == nil {
|
2020-07-07 00:04:24 +00:00
|
|
|
return nil, fmt.Errorf("unknown topic %v", req.Topic)
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
if req.Subject == SubjectWildcard {
|
|
|
|
if _, supportsWildcard := e.wildcards[req.Topic]; !supportsWildcard {
|
|
|
|
return nil, fmt.Errorf("topic %s does not support wildcard subscriptions", req.Topic)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
topicBuf := e.bufferForSubscription(req.topicSubject())
|
|
|
|
topicBuf.refs++
|
|
|
|
|
|
|
|
// freeBuf is used to free the topic buffer once there are no remaining
|
|
|
|
// subscribers for the given topic and key.
|
|
|
|
//
|
|
|
|
// Note: it's called by Subcription.Unsubscribe which has its own side-effects
|
|
|
|
// that are made without holding e.lock (so there's a moment where the ref
|
|
|
|
// counter is inconsistent with the subscription map) — in practice this is
|
|
|
|
// fine, we don't need these things to be strongly consistent. The alternative
|
|
|
|
// would be to hold both locks, which introduces the risk of deadlocks.
|
|
|
|
freeBuf := func() {
|
|
|
|
e.lock.Lock()
|
|
|
|
defer e.lock.Unlock()
|
|
|
|
|
|
|
|
topicBuf.refs--
|
|
|
|
|
|
|
|
if topicBuf.refs == 0 {
|
|
|
|
delete(e.topicBuffers, req.topicSubject())
|
|
|
|
|
|
|
|
// Evict cached snapshot too because the topic buffer will have been spliced
|
|
|
|
// onto it. If we don't do this, any new subscribers started before the cache
|
|
|
|
// TTL is reached will get "stuck" waiting on the old buffer.
|
|
|
|
delete(e.snapCache, req.topicSubject())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
topicHead := topicBuf.buf.Head()
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2020-10-01 17:51:55 +00:00
|
|
|
// If the client view is fresh, resume the stream.
|
|
|
|
if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
|
2020-07-06 21:29:45 +00:00
|
|
|
buf := newEventBuffer()
|
2020-10-01 17:51:55 +00:00
|
|
|
subscriptionHead := buf.Head()
|
|
|
|
// splice the rest of the topic buffer onto the subscription buffer so
|
|
|
|
// the subscription will receive new events.
|
2021-06-04 22:29:04 +00:00
|
|
|
next, _ := topicHead.NextNoBlock()
|
|
|
|
buf.AppendItem(next)
|
2022-01-28 12:27:00 +00:00
|
|
|
return e.subscriptions.add(req, subscriptionHead, freeBuf), nil
|
2020-10-01 17:51:55 +00:00
|
|
|
}
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2020-10-01 17:51:55 +00:00
|
|
|
snapFromCache := e.getCachedSnapshotLocked(req)
|
2021-02-16 17:20:19 +00:00
|
|
|
if snapFromCache == nil {
|
|
|
|
snap := newEventSnapshot()
|
|
|
|
snap.appendAndSplice(*req, handler, topicHead)
|
|
|
|
e.setCachedSnapshotLocked(req, snap)
|
|
|
|
snapFromCache = snap
|
2020-10-01 17:51:55 +00:00
|
|
|
}
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2021-02-16 17:20:19 +00:00
|
|
|
// If the request.Index is 0 the client has no view, send a full snapshot.
|
|
|
|
if req.Index == 0 {
|
2022-01-28 12:27:00 +00:00
|
|
|
return e.subscriptions.add(req, snapFromCache.First, freeBuf), nil
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
|
2021-02-16 17:20:19 +00:00
|
|
|
// otherwise the request has an Index, the client view is stale and must be reset
|
|
|
|
// with a NewSnapshotToFollow event.
|
|
|
|
result := newEventSnapshot()
|
|
|
|
result.buffer.Append([]Event{{
|
|
|
|
Topic: req.Topic,
|
|
|
|
Payload: newSnapshotToFollow{},
|
|
|
|
}})
|
|
|
|
result.buffer.AppendItem(snapFromCache.First)
|
2022-01-28 12:27:00 +00:00
|
|
|
return e.subscriptions.add(req, result.First, freeBuf), nil
|
2020-06-18 22:11:42 +00:00
|
|
|
}
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem, freeBuf func()) *Subscription {
|
|
|
|
// We wrap freeBuf in a sync.Once as it's expected that Subscription.unsub is
|
|
|
|
// idempotent, but freeBuf decrements the reference counter on every call.
|
|
|
|
var once sync.Once
|
|
|
|
sub := newSubscription(*req, head, func() {
|
|
|
|
s.unsubscribe(req)
|
|
|
|
once.Do(freeBuf)
|
|
|
|
})
|
2020-10-01 17:51:55 +00:00
|
|
|
|
2020-06-18 22:11:42 +00:00
|
|
|
s.lock.Lock()
|
|
|
|
defer s.lock.Unlock()
|
|
|
|
|
2020-06-18 22:29:06 +00:00
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
2020-06-02 22:37:10 +00:00
|
|
|
if !ok {
|
2020-07-06 20:15:13 +00:00
|
|
|
subsByToken = make(map[*SubscribeRequest]*Subscription)
|
2020-06-18 22:29:06 +00:00
|
|
|
s.byToken[req.Token] = subsByToken
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
subsByToken[req] = sub
|
2020-10-01 17:51:55 +00:00
|
|
|
return sub
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
|
2020-07-06 18:34:58 +00:00
|
|
|
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
|
|
|
s.lock.RLock()
|
|
|
|
defer s.lock.RUnlock()
|
|
|
|
|
|
|
|
for _, secretID := range tokenSecretIDs {
|
|
|
|
if subs, ok := s.byToken[secretID]; ok {
|
|
|
|
for _, sub := range subs {
|
2020-07-08 04:31:22 +00:00
|
|
|
sub.forceClose()
|
2020-07-06 18:34:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-28 12:27:00 +00:00
|
|
|
func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
|
|
|
|
s.lock.Lock()
|
|
|
|
defer s.lock.Unlock()
|
|
|
|
|
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
delete(subsByToken, req)
|
|
|
|
if len(subsByToken) == 0 {
|
|
|
|
delete(s.byToken, req.Token)
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-16 19:26:26 +00:00
|
|
|
func (s *subscriptions) closeAll() {
|
|
|
|
s.lock.Lock()
|
|
|
|
defer s.lock.Unlock()
|
|
|
|
|
|
|
|
for _, byRequest := range s.byToken {
|
|
|
|
for _, sub := range byRequest {
|
2022-05-23 12:59:13 +00:00
|
|
|
sub.shutDown()
|
2020-07-16 19:26:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-12 13:47:42 +00:00
|
|
|
func (s *subscriptions) closeAllByTopic(topic Topic) {
|
|
|
|
s.lock.Lock()
|
|
|
|
defer s.lock.Unlock()
|
|
|
|
|
|
|
|
for _, byRequest := range s.byToken {
|
|
|
|
for _, sub := range byRequest {
|
|
|
|
if sub.req.Topic == topic {
|
|
|
|
sub.forceClose()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-01 17:51:55 +00:00
|
|
|
// EventPublisher.lock must be held to call this method.
|
|
|
|
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
|
2022-01-28 12:27:00 +00:00
|
|
|
snap, ok := e.snapCache[req.topicSubject()]
|
2020-07-06 21:29:45 +00:00
|
|
|
if ok && snap.err() == nil {
|
2020-10-01 17:51:55 +00:00
|
|
|
return snap
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
2020-10-01 17:51:55 +00:00
|
|
|
return nil
|
|
|
|
}
|
2020-06-02 22:37:10 +00:00
|
|
|
|
2020-10-01 17:51:55 +00:00
|
|
|
// EventPublisher.lock must be held to call this method.
|
|
|
|
func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *eventSnapshot) {
|
|
|
|
if e.snapCacheTTL == 0 {
|
|
|
|
return
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
2022-01-28 12:27:00 +00:00
|
|
|
e.snapCache[req.topicSubject()] = snap
|
2020-10-01 17:51:55 +00:00
|
|
|
|
|
|
|
// Setup a cache eviction
|
|
|
|
time.AfterFunc(e.snapCacheTTL, func() {
|
|
|
|
e.lock.Lock()
|
|
|
|
defer e.lock.Unlock()
|
2022-01-28 12:27:00 +00:00
|
|
|
delete(e.snapCache, req.topicSubject())
|
2020-10-01 17:51:55 +00:00
|
|
|
})
|
2020-06-02 22:37:10 +00:00
|
|
|
}
|
2022-04-12 13:47:42 +00:00
|
|
|
|
|
|
|
// forceEvictByTopic will remove all entries from the snapshot cache for a given topic.
|
|
|
|
// This method should be called while holding the publishers lock.
|
|
|
|
func (e *EventPublisher) forceEvictByTopic(topic Topic) {
|
|
|
|
e.lock.Lock()
|
|
|
|
for key := range e.snapCache {
|
|
|
|
if key.Topic == topic.String() {
|
|
|
|
delete(e.snapCache, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
e.lock.Unlock()
|
|
|
|
}
|