diff --git a/.changelog/12080.txt b/.changelog/12080.txt new file mode 100644 index 000000000..eab0ce590 --- /dev/null +++ b/.changelog/12080.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +streaming: Improved performance when the server is handling many concurrent subscriptions and has a high number of CPU cores +``` diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 7ee4e4dce..38c37c653 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -32,31 +32,26 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo return e.Value.CanRead(authz) == acl.Allow } -func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace, partition string) bool { - if key == "" && namespace == "" && partition == "" { - return true - } - - if e.Value.Service == nil { - return false - } - - name := e.Value.Service.Service - if e.overrideKey != "" { - name = e.overrideKey - } - ns := e.Value.Service.EnterpriseMeta.NamespaceOrDefault() - if e.overrideNamespace != "" { - ns = e.overrideNamespace - } - ap := e.Value.Service.EnterpriseMeta.PartitionOrDefault() +func (e EventPayloadCheckServiceNode) Subject() stream.Subject { + partition := e.Value.Service.PartitionOrDefault() if e.overridePartition != "" { - ap = e.overridePartition + partition = e.overridePartition } + partition = strings.ToLower(partition) - return (key == "" || strings.EqualFold(key, name)) && - (namespace == "" || strings.EqualFold(namespace, ns)) && - (partition == "" || strings.EqualFold(partition, ap)) + namespace := e.Value.Service.NamespaceOrDefault() + if e.overrideNamespace != "" { + namespace = e.overrideNamespace + } + namespace = strings.ToLower(namespace) + + key := e.Value.Service.Service + if e.overrideKey != "" { + key = e.overrideKey + } + key = strings.ToLower(key) + + return stream.Subject(partition + "/" + namespace + "/" + key) } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -67,8 +62,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { defer tx.Abort() connect := topic == topicServiceHealthConnect - entMeta := structs.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace) - idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &entMeta) + idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &req.EnterpriseMeta) if err != nil { return 0, err } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 9becf5bcb..16269242e 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -11,11 +11,106 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" ) +func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) { + // Matches. + for desc, tc := range map[string]struct { + evt EventPayloadCheckServiceNode + req stream.SubscribeRequest + }{ + "default partition and namespace": { + EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: "foo", + }, + }, + }, + stream.SubscribeRequest{ + Key: "foo", + EnterpriseMeta: structs.EnterpriseMeta{}, + }, + }, + "mixed casing": { + EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: "FoO", + }, + }, + }, + stream.SubscribeRequest{Key: "foo"}, + }, + "override key": { + EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: "foo", + }, + }, + overrideKey: "bar", + }, + stream.SubscribeRequest{Key: "bar"}, + }, + } { + t.Run(desc, func(t *testing.T) { + require.Equal(t, tc.req.Subject(), tc.evt.Subject()) + }) + } + + // Non-matches. + for desc, tc := range map[string]struct { + evt EventPayloadCheckServiceNode + req stream.SubscribeRequest + }{ + "different key": { + EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: "foo", + }, + }, + }, + stream.SubscribeRequest{ + Key: "bar", + }, + }, + "different partition": { + EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: "foo", + }, + }, + overridePartition: "bar", + }, + stream.SubscribeRequest{ + Key: "foo", + }, + }, + "different namespace": { + EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: "foo", + }, + }, + overrideNamespace: "bar", + }, + stream.SubscribeRequest{ + Key: "foo", + }, + }, + } { + t.Run(desc, func(t *testing.T) { + require.NotEqual(t, tc.req.Subject(), tc.evt.Subject()) + }) + } +} + func TestServiceHealthSnapshot(t *testing.T) { store := NewStateStore(nil) @@ -1771,7 +1866,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { // all events for a particular topic are grouped together. The sort is // stable so events with the same key retain their relative order. // -// This sort should match the logic in EventPayloadCheckServiceNode.MatchesKey +// This sort should match the logic in EventPayloadCheckServiceNode.Subject // to avoid masking bugs. var cmpPartialOrderEvents = cmp.Options{ cmpopts.SortSlices(func(i, j stream.Event) bool { @@ -2418,107 +2513,6 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) } } -func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) { - type testCase struct { - name string - payload EventPayloadCheckServiceNode - key string - namespace string - partition string // TODO(partitions): create test cases for this being set - expected bool - } - - fn := func(t *testing.T, tc testCase) { - if tc.namespace != "" && pbcommon.DefaultEnterpriseMeta.Namespace == "" { - t.Skip("cant test namespace matching without namespace support") - } - - require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace, tc.partition)) - } - - var testCases = []testCase{ - { - name: "no key or namespace", - payload: newPayloadCheckServiceNode("srv1", "ns1"), - expected: true, - }, - { - name: "no key, with namespace match", - payload: newPayloadCheckServiceNode("srv1", "ns1"), - namespace: "ns1", - expected: true, - }, - { - name: "no namespace, with key match", - payload: newPayloadCheckServiceNode("srv1", "ns1"), - key: "srv1", - expected: true, - }, - { - name: "key match, namespace mismatch", - payload: newPayloadCheckServiceNode("srv1", "ns1"), - key: "srv1", - namespace: "ns2", - expected: false, - }, - { - name: "key mismatch, namespace match", - payload: newPayloadCheckServiceNode("srv1", "ns1"), - key: "srv2", - namespace: "ns1", - expected: false, - }, - { - name: "override key match", - payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""), - key: "srv1", - namespace: "ns1", - expected: true, - }, - { - name: "override key mismatch", - payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""), - key: "proxy", - namespace: "ns1", - expected: false, - }, - { - name: "override namespace match", - payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"), - key: "proxy", - namespace: "ns2", - expected: true, - }, - { - name: "override namespace mismatch", - payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"), - key: "proxy", - namespace: "ns1", - expected: false, - }, - { - name: "override both key and namespace match", - payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"), - key: "srv1", - namespace: "ns2", - expected: true, - }, - { - name: "override both key and namespace mismatch namespace", - payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"), - key: "proxy", - namespace: "ns1", - expected: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - fn(t, tc) - }) - } -} - func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode { return EventPayloadCheckServiceNode{ Value: &structs.CheckServiceNode{ diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 60709f5fd..edd051389 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -419,26 +419,14 @@ type nodePayload struct { node *structs.ServiceNode } -func (p nodePayload) MatchesKey(key, _, partition string) bool { - if key == "" && partition == "" { - return true - } - - if p.node == nil { - return false - } - - if structs.PartitionOrDefault(partition) != p.node.PartitionOrDefault() { - return false - } - - return p.key == key -} - func (p nodePayload) HasReadPermission(acl.Authorizer) bool { return true } +func (p nodePayload) Subject() stream.Subject { + return stream.Subject(p.node.PartitionOrDefault() + "/" + p.node.NamespaceOrDefault() + "/" + p.key) +} + func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 285710543..9240f6524 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -14,6 +14,11 @@ import ( // events which match the Topic. type Topic fmt.Stringer +// Subject identifies a portion of a topic for which a subscriber wishes to +// receive events (e.g. health events for a particular service) usually the +// normalized resource name (including partition and namespace if applicable). +type Subject string + // Event is a structure with identifiers and a payload. Events are Published to // EventPublisher and returned to Subscribers. type Event struct { @@ -26,18 +31,16 @@ type Event struct { // should not modify the state of the payload if the Event is being submitted to // EventPublisher.Publish. type Payload interface { - // MatchesKey must return true if the Payload should be included in a - // subscription requested with the key, namespace, and partition. - // - // Generally this means that the payload matches the key, namespace, and - // partition or the payload is a special framing event that should be - // returned to every subscription. - MatchesKey(key, namespace, partition string) bool - // HasReadPermission uses the acl.Authorizer to determine if the items in the // Payload are visible to the request. It returns true if the payload is // authorized for Read, otherwise returns false. HasReadPermission(authz acl.Authorizer) bool + + // Subject is used to identify which subscribers should be notified of this + // event - e.g. those subscribing to health events for a particular service. + // it is usually the normalized resource name (including the partition and + // namespace if applicable). + Subject() Subject } // PayloadEvents is a Payload that may be returned by Subscription.Next when @@ -81,14 +84,6 @@ func (p *PayloadEvents) filter(f func(Event) bool) bool { return true } -// MatchesKey filters the PayloadEvents to those which match the key, -// namespace, and partition. -func (p *PayloadEvents) MatchesKey(key, namespace, partition string) bool { - return p.filter(func(event Event) bool { - return event.Payload.MatchesKey(key, namespace, partition) - }) -} - func (p *PayloadEvents) Len() int { return len(p.Items) } @@ -101,6 +96,14 @@ func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool { }) } +// Subject is required to satisfy the Payload interface but is not implemented +// by PayloadEvents. PayloadEvents structs are constructed by Subscription.Next +// *after* Subject has been used to dispatch the enclosed events to the correct +// buffer. +func (PayloadEvents) Subject() Subject { + panic("PayloadEvents does not implement Subject") +} + // IsEndOfSnapshot returns true if this is a framing event that indicates the // snapshot has completed. Subsequent events from Subscription.Next will be // streamed as they occur. @@ -117,12 +120,15 @@ func (e Event) IsNewSnapshotToFollow() bool { type framingEvent struct{} -func (framingEvent) MatchesKey(string, string, string) bool { +func (framingEvent) HasReadPermission(acl.Authorizer) bool { return true } -func (framingEvent) HasReadPermission(acl.Authorizer) bool { - return true +// Subject is required by the Payload interface but is not implemented by +// framing events, as they are typically *manually* appended to the correct +// buffer and do not need to be routed using a Subject. +func (framingEvent) Subject() Subject { + panic("framing events do not implement Subject") } type endOfSnapshot struct { @@ -137,12 +143,15 @@ type closeSubscriptionPayload struct { tokensSecretIDs []string } -func (closeSubscriptionPayload) MatchesKey(string, string, string) bool { +func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool { return false } -func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool { - return false +// Subject is required by the Payload interface but it is not implemented by +// closeSubscriptionPayload, as this event type is handled separately and not +// actually appended to the buffer. +func (closeSubscriptionPayload) Subject() Subject { + panic("closeSubscriptionPayload does not implement Subject") } // NewCloseSubscriptionEvent returns a special Event that is handled by the diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 163fa8109..094101355 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -20,16 +20,16 @@ type EventPublisher struct { // seconds. snapCacheTTL time.Duration - // This lock protects the topicBuffers, and snapCache + // This lock protects the snapCache, topicBuffers and topicBuffer.refs. lock sync.RWMutex - // topicBuffers stores the head of the linked-list buffer to publish events to + // topicBuffers stores the head of the linked-list buffers to publish events to // for a topic. - topicBuffers map[Topic]*eventBuffer + topicBuffers map[topicSubject]*topicBuffer - // snapCache if a cache of EventSnapshots indexed by topic and key. + // snapCache if a cache of EventSnapshots indexed by topic and subject. // TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL - snapCache map[Topic]map[string]*eventSnapshot + snapCache map[topicSubject]*eventSnapshot subscriptions *subscriptions @@ -41,6 +41,13 @@ type EventPublisher struct { snapshotHandlers SnapshotHandlers } +// topicSubject is used as a map key when accessing topic buffers and cached +// snapshots. +type topicSubject struct { + Topic Topic + Subject Subject +} + type subscriptions struct { // lock for byToken. If both subscription.lock and EventPublisher.lock need // to be held, EventPublisher.lock MUST always be acquired first. @@ -54,6 +61,14 @@ type subscriptions struct { byToken map[string]map[*SubscribeRequest]*Subscription } +// topicBuffer augments the eventBuffer with a reference counter, enabling +// clean up of unused buffers once there are no longer any subscribers for +// the given topic and key. +type topicBuffer struct { + refs int // refs is guarded by EventPublisher.lock. + buf *eventBuffer +} + // SnapshotHandlers is a mapping of Topic to a function which produces a snapshot // of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. // The nil Topic is reserved and should not be used. @@ -79,8 +94,8 @@ type SnapshotAppender interface { func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, - topicBuffers: make(map[Topic]*eventBuffer), - snapCache: make(map[Topic]map[string]*eventSnapshot), + topicBuffers: make(map[topicSubject]*topicBuffer), + snapCache: make(map[topicSubject]*eventSnapshot), publishCh: make(chan []Event, 64), subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), @@ -116,36 +131,59 @@ func (e *EventPublisher) Run(ctx context.Context) { // publishEvent appends the events to any applicable topic buffers. It handles // any closeSubscriptionPayload events by closing associated subscriptions. func (e *EventPublisher) publishEvent(events []Event) { - eventsByTopic := make(map[Topic][]Event) + groupedEvents := make(map[topicSubject][]Event) for _, event := range events { if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok { e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs) continue } - eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) + groupKey := topicSubject{event.Topic, event.Payload.Subject()} + groupedEvents[groupKey] = append(groupedEvents[groupKey], event) } e.lock.Lock() defer e.lock.Unlock() - for topic, events := range eventsByTopic { - e.getTopicBuffer(topic).Append(events) + for groupKey, events := range groupedEvents { + // Note: bufferForPublishing returns nil if there are no subscribers for the + // given topic and subject, in which case events will be dropped on the floor and + // future subscribers will catch up by consuming the snapshot. + if buf := e.bufferForPublishing(groupKey); buf != nil { + buf.Append(events) + } } } -// getTopicBuffer for the topic. Creates a new event buffer if one does not -// already exist. +// bufferForSubscription returns the topic event buffer to which events for the +// given topic and key will be appended. If no such buffer exists, a new buffer +// will be created. // -// EventPublisher.lock must be held to call this method. -func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer { - buf, ok := e.topicBuffers[topic] +// Warning: e.lock MUST be held when calling this function. +func (e *EventPublisher) bufferForSubscription(key topicSubject) *topicBuffer { + buf, ok := e.topicBuffers[key] if !ok { - buf = newEventBuffer() - e.topicBuffers[topic] = buf + buf = &topicBuffer{ + buf: newEventBuffer(), + } + e.topicBuffers[key] = buf } + return buf } +// bufferForPublishing returns the event buffer to which events for the given +// topic and key should be appended. nil will be returned if there are no +// subscribers for the given topic and key. +// +// Warning: e.lock MUST be held when calling this function. +func (e *EventPublisher) bufferForPublishing(key topicSubject) *eventBuffer { + buf, ok := e.topicBuffers[key] + if !ok { + return nil + } + return buf.buf +} + // Subscribe returns a new Subscription for the given request. A subscription // will receive an initial snapshot of events matching the request if req.Index > 0. // After the snapshot, events will be streamed as they are created. @@ -163,7 +201,34 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) e.lock.Lock() defer e.lock.Unlock() - topicHead := e.getTopicBuffer(req.Topic).Head() + topicBuf := e.bufferForSubscription(req.topicSubject()) + topicBuf.refs++ + + // freeBuf is used to free the topic buffer once there are no remaining + // subscribers for the given topic and key. + // + // Note: it's called by Subcription.Unsubscribe which has its own side-effects + // that are made without holding e.lock (so there's a moment where the ref + // counter is inconsistent with the subscription map) — in practice this is + // fine, we don't need these things to be strongly consistent. The alternative + // would be to hold both locks, which introduces the risk of deadlocks. + freeBuf := func() { + e.lock.Lock() + defer e.lock.Unlock() + + topicBuf.refs-- + + if topicBuf.refs == 0 { + delete(e.topicBuffers, req.topicSubject()) + + // Evict cached snapshot too because the topic buffer will have been spliced + // onto it. If we don't do this, any new subscribers started before the cache + // TTL is reached will get "stuck" waiting on the old buffer. + delete(e.snapCache, req.topicSubject()) + } + } + + topicHead := topicBuf.buf.Head() // If the client view is fresh, resume the stream. if req.Index > 0 && topicHead.HasEventIndex(req.Index) { @@ -173,7 +238,7 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) // the subscription will receive new events. next, _ := topicHead.NextNoBlock() buf.AppendItem(next) - return e.subscriptions.add(req, subscriptionHead), nil + return e.subscriptions.add(req, subscriptionHead, freeBuf), nil } snapFromCache := e.getCachedSnapshotLocked(req) @@ -186,7 +251,7 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) // If the request.Index is 0 the client has no view, send a full snapshot. if req.Index == 0 { - return e.subscriptions.add(req, snapFromCache.First), nil + return e.subscriptions.add(req, snapFromCache.First, freeBuf), nil } // otherwise the request has an Index, the client view is stale and must be reset @@ -197,11 +262,17 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) Payload: newSnapshotToFollow{}, }}) result.buffer.AppendItem(snapFromCache.First) - return e.subscriptions.add(req, result.First), nil + return e.subscriptions.add(req, result.First, freeBuf), nil } -func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription { - sub := newSubscription(*req, head, s.unsubscribe(req)) +func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem, freeBuf func()) *Subscription { + // We wrap freeBuf in a sync.Once as it's expected that Subscription.unsub is + // idempotent, but freeBuf decrements the reference counter on every call. + var once sync.Once + sub := newSubscription(*req, head, func() { + s.unsubscribe(req) + once.Do(freeBuf) + }) s.lock.Lock() defer s.lock.Unlock() @@ -228,24 +299,17 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { } } -// unsubscribe returns a function that the subscription will call to remove -// itself from the subsByToken. -// This function is returned as a closure so that the caller doesn't need to keep -// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the -// wrong pointer. -func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { - return func() { - s.lock.Lock() - defer s.lock.Unlock() +func (s *subscriptions) unsubscribe(req *SubscribeRequest) { + s.lock.Lock() + defer s.lock.Unlock() - subsByToken, ok := s.byToken[req.Token] - if !ok { - return - } - delete(subsByToken, req) - if len(subsByToken) == 0 { - delete(s.byToken, req.Token) - } + subsByToken, ok := s.byToken[req.Token] + if !ok { + return + } + delete(subsByToken, req) + if len(subsByToken) == 0 { + delete(s.byToken, req.Token) } } @@ -262,13 +326,7 @@ func (s *subscriptions) closeAll() { // EventPublisher.lock must be held to call this method. func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot { - topicSnaps, ok := e.snapCache[req.Topic] - if !ok { - topicSnaps = make(map[string]*eventSnapshot) - e.snapCache[req.Topic] = topicSnaps - } - - snap, ok := topicSnaps[snapCacheKey(req)] + snap, ok := e.snapCache[req.topicSubject()] if ok && snap.err() == nil { return snap } @@ -280,16 +338,12 @@ func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *ev if e.snapCacheTTL == 0 { return } - e.snapCache[req.Topic][snapCacheKey(req)] = snap + e.snapCache[req.topicSubject()] = snap // Setup a cache eviction time.AfterFunc(e.snapCacheTTL, func() { e.lock.Lock() defer e.lock.Unlock() - delete(e.snapCache[req.Topic], snapCacheKey(req)) + delete(e.snapCache, req.topicSubject()) }) } - -func snapCacheKey(req *SubscribeRequest) string { - return req.Partition + "/" + req.Namespace + "/" + req.Key -} diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index af7fc3c28..f90af0b1b 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -56,6 +56,13 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"}, } require.Equal(t, expected, next) + + // Subscriber should not see events for other keys + publisher.Publish([]Event{{ + Topic: testTopic, + Payload: simplePayload{key: "other-key", value: "this-should-not-reach-the-subscriber"}, + }}) + assertNoResult(t, eventCh) } var testSnapshotEvent = Event{ @@ -70,17 +77,12 @@ type simplePayload struct { noReadPerm bool } -func (p simplePayload) MatchesKey(key, _, _ string) bool { - if key == "" { - return true - } - return p.key == key -} - func (p simplePayload) HasReadPermission(acl.Authorizer) bool { return !p.noReadPerm } +func (p simplePayload) Subject() Subject { return Subject("default/default/" + p.key) } + func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { @@ -190,9 +192,10 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) go publisher.Run(ctx) + sub, err := publisher.Subscribe(req) require.NoError(t, err) - sub.Unsubscribe() + defer sub.Unsubscribe() publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) { return 0, fmt.Errorf("error should not be seen, cache should have been used") @@ -200,6 +203,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { sub, err = publisher.Subscribe(req) require.NoError(t, err) + defer sub.Unsubscribe() eventCh := runSubscription(ctx, sub) next := getNextEvent(t, eventCh) @@ -233,7 +237,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) go publisher.Run(ctx) - // Include the same event in the topicBuffer + + simulateExistingSubscriber(t, publisher, req) + + // Publish the testSnapshotEvent, to ensure that it is skipped over when + // splicing the topic buffer onto the snapshot. publisher.publishEvent([]Event{testSnapshotEvent}) runStep(t, "start a subscription and unsub", func(t *testing.T) { @@ -338,7 +346,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) go publisher.Run(ctx) - // Include the same event in the topicBuffer + + simulateExistingSubscriber(t, publisher, req) + + // Publish the testSnapshotEvent, to ensure that it is skipped over when + // splicing the topic buffer onto the snapshot. publisher.publishEvent([]Event{testSnapshotEvent}) runStep(t, "start a subscription and unsub", func(t *testing.T) { @@ -421,7 +433,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi publisher := NewEventPublisher(handlers, time.Second) go publisher.Run(ctx) - // Include the same events in the topicBuffer + + simulateExistingSubscriber(t, publisher, req) + + // Publish the events, to ensure they are is skipped over when splicing the + // topic buffer onto the snapshot. publisher.publishEvent([]Event{testSnapshotEvent}) publisher.publishEvent([]Event{nextEvent}) @@ -495,3 +511,60 @@ func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "subscription was closed by unsubscribe") } + +func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) { + req := &SubscribeRequest{ + Topic: testTopic, + Key: "sub-key", + } + + publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) + + sub1, err := publisher.Subscribe(req) + require.NoError(t, err) + + // Expect a topic buffer and snapshot to have been created. + publisher.lock.Lock() + require.NotNil(t, publisher.topicBuffers[req.topicSubject()]) + require.NotNil(t, publisher.snapCache[req.topicSubject()]) + publisher.lock.Unlock() + + // Create another subscription and close the old one, to ensure the buffer and + // snapshot stick around as long as there's at least one subscriber. + sub2, err := publisher.Subscribe(req) + require.NoError(t, err) + + sub1.Unsubscribe() + + publisher.lock.Lock() + require.NotNil(t, publisher.topicBuffers[req.topicSubject()]) + require.NotNil(t, publisher.snapCache[req.topicSubject()]) + publisher.lock.Unlock() + + // Close the other subscription and expect the buffer and snapshot to have + // been cleaned up. + sub2.Unsubscribe() + + publisher.lock.Lock() + require.Nil(t, publisher.topicBuffers[req.topicSubject()]) + require.Nil(t, publisher.snapCache[req.topicSubject()]) + publisher.lock.Unlock() +} + +// simulateExistingSubscriber creates a subscription that remains open throughout +// a test to prevent the topic buffer getting garbage-collected. +// +// It evicts the created snapshot from the cache immediately (simulating an +// existing subscription that has been open long enough the snapshot's TTL has +// been reached) so you can test snapshots getting created afresh. +func simulateExistingSubscriber(t *testing.T, p *EventPublisher, r *SubscribeRequest) { + t.Helper() + + sub, err := p.Subscribe(r) + require.NoError(t, err) + t.Cleanup(sub.Unsubscribe) + + p.lock.Lock() + delete(p.snapCache, r.topicSubject()) + p.lock.Unlock() +} diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index a3187017c..a9b381fe4 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -20,119 +20,6 @@ func newSimpleEvent(key string, index uint64) Event { return Event{Index: index, Payload: simplePayload{key: key}} } -func TestPayloadEvents_FilterByKey(t *testing.T) { - type testCase struct { - name string - req SubscribeRequest - events []Event - expectEvent bool - expected *PayloadEvents - expectedCap int - } - - fn := func(t *testing.T, tc testCase) { - events := make([]Event, 0, 5) - events = append(events, tc.events...) - - pe := &PayloadEvents{Items: events} - ok := pe.MatchesKey(tc.req.Key, tc.req.Namespace, tc.req.Partition) - require.Equal(t, tc.expectEvent, ok) - if !tc.expectEvent { - return - } - - require.Equal(t, tc.expected, pe) - // test if there was a new array allocated or not - require.Equal(t, tc.expectedCap, cap(pe.Items)) - } - - var testCases = []testCase{ - { - name: "all events match, no key or namespace", - req: SubscribeRequest{Topic: testTopic}, - events: []Event{ - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)}, - expectEvent: true, - expected: newPayloadEvents( - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)), - expectedCap: 5, - }, - { - name: "all events match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)}, - expectEvent: true, - expected: newPayloadEvents( - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)), - expectedCap: 5, - }, - { - name: "all events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")}, - expectEvent: true, - expected: newPayloadEvents( - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")), - expectedCap: 5, - }, - { - name: "some evens match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 104), - newSimpleEvent("Other", 104), - newSimpleEvent("Same", 104)}, - expectEvent: true, - expected: newPayloadEvents( - newSimpleEvent("Same", 104), - newSimpleEvent("Same", 104)), - expectedCap: 2, - }, - { - name: "some events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "apps"), - newNSEvent("db1", "dbs"), - newNSEvent("app2", "apps")}, - expectEvent: true, - expected: newPayloadEvents( - newNSEvent("app1", "apps"), - newNSEvent("app2", "apps")), - expectedCap: 2, - }, - { - name: "no events match key", - req: SubscribeRequest{Topic: testTopic, Key: "Other"}, - events: []Event{ - newSimpleEvent("Same", 0), - newSimpleEvent("Same", 0)}, - }, - { - name: "no events match namespace", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "group1"), - newNSEvent("app2", "group2")}, - expectEvent: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - fn(t, tc) - }) - } -} - // TODO(partitions) func newNSEvent(key, namespace string) Event { return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} @@ -146,12 +33,6 @@ type nsPayload struct { value string } -func (p nsPayload) MatchesKey(key, namespace, partition string) bool { - return (key == "" || key == p.key) && - (namespace == "" || namespace == p.namespace) && - (partition == "" || partition == p.partition) -} - func TestPayloadEvents_HasReadPermission(t *testing.T) { t.Run("some events filtered", func(t *testing.T) { ep := newPayloadEvents( diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 9f47cd2ee..0a4294715 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -4,7 +4,10 @@ import ( "context" "errors" "fmt" + "strings" "sync/atomic" + + "github.com/hashicorp/consul/agent/structs" ) const ( @@ -59,12 +62,9 @@ type SubscribeRequest struct { // be returned by the subscription. A blank key will return all events. Key // is generally the name of the resource. Key string - // Namespace used to filter events in the topic. Only events matching the - // namespace will be returned by the subscription. - Namespace string - // Partition used to filter events in the topic. Only events matching the - // partition will be returned by the subscription. - Partition string // TODO(partitions): make this work + // EnterpriseMeta is used to filter events in the topic. Only events matching + // the partition and namespace will be returned by the subscription. + EnterpriseMeta structs.EnterpriseMeta // Token that was used to authenticate the request. If any ACL policy // changes impact the token the subscription will be forcefully closed. Token string @@ -74,6 +74,19 @@ type SubscribeRequest struct { Index uint64 } +func (req SubscribeRequest) Subject() Subject { + var ( + partition = req.EnterpriseMeta.PartitionOrDefault() + namespace = req.EnterpriseMeta.NamespaceOrDefault() + key = strings.ToLower(req.Key) + ) + return Subject(partition + "/" + namespace + "/" + key) +} + +func (req SubscribeRequest) topicSubject() topicSubject { + return topicSubject{req.Topic, req.Subject()} +} + // newSubscription return a new subscription. The caller is responsible for // calling Unsubscribe when it is done with the subscription, to free resources. func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription { @@ -104,11 +117,7 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) { if len(next.Events) == 0 { continue } - event := newEventFromBatch(s.req, next.Events) - if !event.Payload.MatchesKey(s.req.Key, s.req.Namespace, s.req.Partition) { - continue - } - return event, nil + return newEventFromBatch(s.req, next.Events), nil } } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index f9a473c0c..cf3be6393 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -6,10 +6,32 @@ import ( time "time" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" ) func noopUnSub() {} +func TestSubscription_Subject(t *testing.T) { + for desc, tc := range map[string]struct { + req SubscribeRequest + sub Subject + }{ + "default partition and namespace": { + SubscribeRequest{Key: "foo", EnterpriseMeta: structs.EnterpriseMeta{}}, + "default/default/foo", + }, + "mixed casing": { + SubscribeRequest{Key: "BaZ"}, + "default/default/baz", + }, + } { + t.Run(desc, func(t *testing.T) { + require.Equal(t, tc.sub, tc.req.Subject()) + }) + } +} + func TestSubscription(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -59,10 +81,6 @@ func TestSubscription(t *testing.T) { "Event should have been delivered after short time, took %s", elapsed) require.Equal(t, index, got.Index) - // Event with wrong key should not be delivered. Deliver a good message right - // so we don't have to block test thread forever or cancel func yet. - index++ - publishTestEvent(index, eb, "nope") index++ publishTestEvent(index, eb, "test") diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 4c7255c62..1a9d0031a 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -57,6 +57,10 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub return err } + if req.Key == "" { + return status.Error(codes.InvalidArgument, "Key is required") + } + sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta)) if err != nil { return err @@ -89,12 +93,11 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest { return &stream.SubscribeRequest{ - Topic: req.Topic, - Key: req.Key, - Token: req.Token, - Index: req.Index, - Namespace: entMeta.NamespaceOrEmpty(), - Partition: entMeta.PartitionOrEmpty(), + Topic: req.Topic, + Key: req.Key, + EnterpriseMeta: entMeta, + Token: req.Token, + Index: req.Index, } } diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 5508b1571..0c928b420 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -30,6 +30,33 @@ import ( "github.com/hashicorp/consul/types" ) +func TestServer_Subscribe_KeyIsRequired(t *testing.T) { + backend, err := newTestBackend() + require.NoError(t, err) + + addr := runTestServer(t, NewServer(backend, hclog.New(nil))) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure()) + require.NoError(t, err) + t.Cleanup(logError(t, conn.Close)) + + client := pbsubscribe.NewStateChangeSubscriptionClient(conn) + + stream, err := client.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "", + }) + require.NoError(t, err) + + _, err = stream.Recv() + require.Error(t, err) + require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String()) + require.Contains(t, err.Error(), "Key is required") +} + func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { backend, err := newTestBackend() require.NoError(t, err) @@ -878,6 +905,8 @@ func assertNoEvents(t *testing.T, chEvents chan eventOrError) { func logError(t *testing.T, f func() error) func() { return func() { + t.Helper() + if err := f(); err != nil { t.Logf(err.Error()) } diff --git a/proto/pbsubscribe/subscribe.pb.go b/proto/pbsubscribe/subscribe.pb.go index 65a9ab7d4..98ad79087 100644 --- a/proto/pbsubscribe/subscribe.pb.go +++ b/proto/pbsubscribe/subscribe.pb.go @@ -90,9 +90,8 @@ type SubscribeRequest struct { Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"` // Key is a topic-specific identifier that restricts the scope of the // subscription to only events pertaining to that identifier. For example, - // to receive events for a single service, the service's name is - // specified as the key. An empty key indicates that all events in the topic - // are of interest. + // to receive events for a single service, the service's name is specified + // as the key. Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` // Token is the ACL token to authenticate the request. The token must have // sufficient privileges to read the requested information otherwise events diff --git a/proto/pbsubscribe/subscribe.proto b/proto/pbsubscribe/subscribe.proto index 95aac6c29..a860b874b 100644 --- a/proto/pbsubscribe/subscribe.proto +++ b/proto/pbsubscribe/subscribe.proto @@ -51,9 +51,8 @@ message SubscribeRequest { // Key is a topic-specific identifier that restricts the scope of the // subscription to only events pertaining to that identifier. For example, - // to receive events for a single service, the service's name is - // specified as the key. An empty key indicates that all events in the topic - // are of interest. + // to receive events for a single service, the service's name is specified + // as the key. string Key = 2; // Token is the ACL token to authenticate the request. The token must have