streaming: split event buffer by key (#12080)
This commit is contained in:
parent
a3c4b85cec
commit
ebdda4848f
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:enhancement
|
||||||
|
streaming: Improved performance when the server is handling many concurrent subscriptions and has a high number of CPU cores
|
||||||
|
```
|
|
@ -32,31 +32,26 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo
|
||||||
return e.Value.CanRead(authz) == acl.Allow
|
return e.Value.CanRead(authz) == acl.Allow
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace, partition string) bool {
|
func (e EventPayloadCheckServiceNode) Subject() stream.Subject {
|
||||||
if key == "" && namespace == "" && partition == "" {
|
partition := e.Value.Service.PartitionOrDefault()
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.Value.Service == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
name := e.Value.Service.Service
|
|
||||||
if e.overrideKey != "" {
|
|
||||||
name = e.overrideKey
|
|
||||||
}
|
|
||||||
ns := e.Value.Service.EnterpriseMeta.NamespaceOrDefault()
|
|
||||||
if e.overrideNamespace != "" {
|
|
||||||
ns = e.overrideNamespace
|
|
||||||
}
|
|
||||||
ap := e.Value.Service.EnterpriseMeta.PartitionOrDefault()
|
|
||||||
if e.overridePartition != "" {
|
if e.overridePartition != "" {
|
||||||
ap = e.overridePartition
|
partition = e.overridePartition
|
||||||
}
|
}
|
||||||
|
partition = strings.ToLower(partition)
|
||||||
|
|
||||||
return (key == "" || strings.EqualFold(key, name)) &&
|
namespace := e.Value.Service.NamespaceOrDefault()
|
||||||
(namespace == "" || strings.EqualFold(namespace, ns)) &&
|
if e.overrideNamespace != "" {
|
||||||
(partition == "" || strings.EqualFold(partition, ap))
|
namespace = e.overrideNamespace
|
||||||
|
}
|
||||||
|
namespace = strings.ToLower(namespace)
|
||||||
|
|
||||||
|
key := e.Value.Service.Service
|
||||||
|
if e.overrideKey != "" {
|
||||||
|
key = e.overrideKey
|
||||||
|
}
|
||||||
|
key = strings.ToLower(key)
|
||||||
|
|
||||||
|
return stream.Subject(partition + "/" + namespace + "/" + key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
||||||
|
@ -67,8 +62,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
connect := topic == topicServiceHealthConnect
|
connect := topic == topicServiceHealthConnect
|
||||||
entMeta := structs.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
|
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &req.EnterpriseMeta)
|
||||||
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &entMeta)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,11 +11,106 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/proto/pbcommon"
|
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) {
|
||||||
|
// Matches.
|
||||||
|
for desc, tc := range map[string]struct {
|
||||||
|
evt EventPayloadCheckServiceNode
|
||||||
|
req stream.SubscribeRequest
|
||||||
|
}{
|
||||||
|
"default partition and namespace": {
|
||||||
|
EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stream.SubscribeRequest{
|
||||||
|
Key: "foo",
|
||||||
|
EnterpriseMeta: structs.EnterpriseMeta{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"mixed casing": {
|
||||||
|
EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "FoO",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stream.SubscribeRequest{Key: "foo"},
|
||||||
|
},
|
||||||
|
"override key": {
|
||||||
|
EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
overrideKey: "bar",
|
||||||
|
},
|
||||||
|
stream.SubscribeRequest{Key: "bar"},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(desc, func(t *testing.T) {
|
||||||
|
require.Equal(t, tc.req.Subject(), tc.evt.Subject())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-matches.
|
||||||
|
for desc, tc := range map[string]struct {
|
||||||
|
evt EventPayloadCheckServiceNode
|
||||||
|
req stream.SubscribeRequest
|
||||||
|
}{
|
||||||
|
"different key": {
|
||||||
|
EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stream.SubscribeRequest{
|
||||||
|
Key: "bar",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"different partition": {
|
||||||
|
EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
overridePartition: "bar",
|
||||||
|
},
|
||||||
|
stream.SubscribeRequest{
|
||||||
|
Key: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"different namespace": {
|
||||||
|
EventPayloadCheckServiceNode{
|
||||||
|
Value: &structs.CheckServiceNode{
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
overrideNamespace: "bar",
|
||||||
|
},
|
||||||
|
stream.SubscribeRequest{
|
||||||
|
Key: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(desc, func(t *testing.T) {
|
||||||
|
require.NotEqual(t, tc.req.Subject(), tc.evt.Subject())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServiceHealthSnapshot(t *testing.T) {
|
func TestServiceHealthSnapshot(t *testing.T) {
|
||||||
store := NewStateStore(nil)
|
store := NewStateStore(nil)
|
||||||
|
|
||||||
|
@ -1771,7 +1866,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
|
||||||
// all events for a particular topic are grouped together. The sort is
|
// all events for a particular topic are grouped together. The sort is
|
||||||
// stable so events with the same key retain their relative order.
|
// stable so events with the same key retain their relative order.
|
||||||
//
|
//
|
||||||
// This sort should match the logic in EventPayloadCheckServiceNode.MatchesKey
|
// This sort should match the logic in EventPayloadCheckServiceNode.Subject
|
||||||
// to avoid masking bugs.
|
// to avoid masking bugs.
|
||||||
var cmpPartialOrderEvents = cmp.Options{
|
var cmpPartialOrderEvents = cmp.Options{
|
||||||
cmpopts.SortSlices(func(i, j stream.Event) bool {
|
cmpopts.SortSlices(func(i, j stream.Event) bool {
|
||||||
|
@ -2418,107 +2513,6 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) {
|
|
||||||
type testCase struct {
|
|
||||||
name string
|
|
||||||
payload EventPayloadCheckServiceNode
|
|
||||||
key string
|
|
||||||
namespace string
|
|
||||||
partition string // TODO(partitions): create test cases for this being set
|
|
||||||
expected bool
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(t *testing.T, tc testCase) {
|
|
||||||
if tc.namespace != "" && pbcommon.DefaultEnterpriseMeta.Namespace == "" {
|
|
||||||
t.Skip("cant test namespace matching without namespace support")
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace, tc.partition))
|
|
||||||
}
|
|
||||||
|
|
||||||
var testCases = []testCase{
|
|
||||||
{
|
|
||||||
name: "no key or namespace",
|
|
||||||
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "no key, with namespace match",
|
|
||||||
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
|
||||||
namespace: "ns1",
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "no namespace, with key match",
|
|
||||||
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
|
||||||
key: "srv1",
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "key match, namespace mismatch",
|
|
||||||
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
|
||||||
key: "srv1",
|
|
||||||
namespace: "ns2",
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "key mismatch, namespace match",
|
|
||||||
payload: newPayloadCheckServiceNode("srv1", "ns1"),
|
|
||||||
key: "srv2",
|
|
||||||
namespace: "ns1",
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "override key match",
|
|
||||||
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""),
|
|
||||||
key: "srv1",
|
|
||||||
namespace: "ns1",
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "override key mismatch",
|
|
||||||
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""),
|
|
||||||
key: "proxy",
|
|
||||||
namespace: "ns1",
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "override namespace match",
|
|
||||||
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"),
|
|
||||||
key: "proxy",
|
|
||||||
namespace: "ns2",
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "override namespace mismatch",
|
|
||||||
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"),
|
|
||||||
key: "proxy",
|
|
||||||
namespace: "ns1",
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "override both key and namespace match",
|
|
||||||
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"),
|
|
||||||
key: "srv1",
|
|
||||||
namespace: "ns2",
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "override both key and namespace mismatch namespace",
|
|
||||||
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"),
|
|
||||||
key: "proxy",
|
|
||||||
namespace: "ns1",
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
fn(t, tc)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode {
|
func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode {
|
||||||
return EventPayloadCheckServiceNode{
|
return EventPayloadCheckServiceNode{
|
||||||
Value: &structs.CheckServiceNode{
|
Value: &structs.CheckServiceNode{
|
||||||
|
|
|
@ -419,26 +419,14 @@ type nodePayload struct {
|
||||||
node *structs.ServiceNode
|
node *structs.ServiceNode
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p nodePayload) MatchesKey(key, _, partition string) bool {
|
|
||||||
if key == "" && partition == "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.node == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if structs.PartitionOrDefault(partition) != p.node.PartitionOrDefault() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return p.key == key
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
|
func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p nodePayload) Subject() stream.Subject {
|
||||||
|
return stream.Subject(p.node.PartitionOrDefault() + "/" + p.node.NamespaceOrDefault() + "/" + p.key)
|
||||||
|
}
|
||||||
|
|
||||||
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
|
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
|
||||||
token := &structs.ACLToken{
|
token := &structs.ACLToken{
|
||||||
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
|
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
|
||||||
|
|
|
@ -14,6 +14,11 @@ import (
|
||||||
// events which match the Topic.
|
// events which match the Topic.
|
||||||
type Topic fmt.Stringer
|
type Topic fmt.Stringer
|
||||||
|
|
||||||
|
// Subject identifies a portion of a topic for which a subscriber wishes to
|
||||||
|
// receive events (e.g. health events for a particular service) usually the
|
||||||
|
// normalized resource name (including partition and namespace if applicable).
|
||||||
|
type Subject string
|
||||||
|
|
||||||
// Event is a structure with identifiers and a payload. Events are Published to
|
// Event is a structure with identifiers and a payload. Events are Published to
|
||||||
// EventPublisher and returned to Subscribers.
|
// EventPublisher and returned to Subscribers.
|
||||||
type Event struct {
|
type Event struct {
|
||||||
|
@ -26,18 +31,16 @@ type Event struct {
|
||||||
// should not modify the state of the payload if the Event is being submitted to
|
// should not modify the state of the payload if the Event is being submitted to
|
||||||
// EventPublisher.Publish.
|
// EventPublisher.Publish.
|
||||||
type Payload interface {
|
type Payload interface {
|
||||||
// MatchesKey must return true if the Payload should be included in a
|
|
||||||
// subscription requested with the key, namespace, and partition.
|
|
||||||
//
|
|
||||||
// Generally this means that the payload matches the key, namespace, and
|
|
||||||
// partition or the payload is a special framing event that should be
|
|
||||||
// returned to every subscription.
|
|
||||||
MatchesKey(key, namespace, partition string) bool
|
|
||||||
|
|
||||||
// HasReadPermission uses the acl.Authorizer to determine if the items in the
|
// HasReadPermission uses the acl.Authorizer to determine if the items in the
|
||||||
// Payload are visible to the request. It returns true if the payload is
|
// Payload are visible to the request. It returns true if the payload is
|
||||||
// authorized for Read, otherwise returns false.
|
// authorized for Read, otherwise returns false.
|
||||||
HasReadPermission(authz acl.Authorizer) bool
|
HasReadPermission(authz acl.Authorizer) bool
|
||||||
|
|
||||||
|
// Subject is used to identify which subscribers should be notified of this
|
||||||
|
// event - e.g. those subscribing to health events for a particular service.
|
||||||
|
// it is usually the normalized resource name (including the partition and
|
||||||
|
// namespace if applicable).
|
||||||
|
Subject() Subject
|
||||||
}
|
}
|
||||||
|
|
||||||
// PayloadEvents is a Payload that may be returned by Subscription.Next when
|
// PayloadEvents is a Payload that may be returned by Subscription.Next when
|
||||||
|
@ -81,14 +84,6 @@ func (p *PayloadEvents) filter(f func(Event) bool) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// MatchesKey filters the PayloadEvents to those which match the key,
|
|
||||||
// namespace, and partition.
|
|
||||||
func (p *PayloadEvents) MatchesKey(key, namespace, partition string) bool {
|
|
||||||
return p.filter(func(event Event) bool {
|
|
||||||
return event.Payload.MatchesKey(key, namespace, partition)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PayloadEvents) Len() int {
|
func (p *PayloadEvents) Len() int {
|
||||||
return len(p.Items)
|
return len(p.Items)
|
||||||
}
|
}
|
||||||
|
@ -101,6 +96,14 @@ func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subject is required to satisfy the Payload interface but is not implemented
|
||||||
|
// by PayloadEvents. PayloadEvents structs are constructed by Subscription.Next
|
||||||
|
// *after* Subject has been used to dispatch the enclosed events to the correct
|
||||||
|
// buffer.
|
||||||
|
func (PayloadEvents) Subject() Subject {
|
||||||
|
panic("PayloadEvents does not implement Subject")
|
||||||
|
}
|
||||||
|
|
||||||
// IsEndOfSnapshot returns true if this is a framing event that indicates the
|
// IsEndOfSnapshot returns true if this is a framing event that indicates the
|
||||||
// snapshot has completed. Subsequent events from Subscription.Next will be
|
// snapshot has completed. Subsequent events from Subscription.Next will be
|
||||||
// streamed as they occur.
|
// streamed as they occur.
|
||||||
|
@ -117,12 +120,15 @@ func (e Event) IsNewSnapshotToFollow() bool {
|
||||||
|
|
||||||
type framingEvent struct{}
|
type framingEvent struct{}
|
||||||
|
|
||||||
func (framingEvent) MatchesKey(string, string, string) bool {
|
func (framingEvent) HasReadPermission(acl.Authorizer) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (framingEvent) HasReadPermission(acl.Authorizer) bool {
|
// Subject is required by the Payload interface but is not implemented by
|
||||||
return true
|
// framing events, as they are typically *manually* appended to the correct
|
||||||
|
// buffer and do not need to be routed using a Subject.
|
||||||
|
func (framingEvent) Subject() Subject {
|
||||||
|
panic("framing events do not implement Subject")
|
||||||
}
|
}
|
||||||
|
|
||||||
type endOfSnapshot struct {
|
type endOfSnapshot struct {
|
||||||
|
@ -137,12 +143,15 @@ type closeSubscriptionPayload struct {
|
||||||
tokensSecretIDs []string
|
tokensSecretIDs []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (closeSubscriptionPayload) MatchesKey(string, string, string) bool {
|
func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool {
|
// Subject is required by the Payload interface but it is not implemented by
|
||||||
return false
|
// closeSubscriptionPayload, as this event type is handled separately and not
|
||||||
|
// actually appended to the buffer.
|
||||||
|
func (closeSubscriptionPayload) Subject() Subject {
|
||||||
|
panic("closeSubscriptionPayload does not implement Subject")
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCloseSubscriptionEvent returns a special Event that is handled by the
|
// NewCloseSubscriptionEvent returns a special Event that is handled by the
|
||||||
|
|
|
@ -20,16 +20,16 @@ type EventPublisher struct {
|
||||||
// seconds.
|
// seconds.
|
||||||
snapCacheTTL time.Duration
|
snapCacheTTL time.Duration
|
||||||
|
|
||||||
// This lock protects the topicBuffers, and snapCache
|
// This lock protects the snapCache, topicBuffers and topicBuffer.refs.
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
// topicBuffers stores the head of the linked-list buffer to publish events to
|
// topicBuffers stores the head of the linked-list buffers to publish events to
|
||||||
// for a topic.
|
// for a topic.
|
||||||
topicBuffers map[Topic]*eventBuffer
|
topicBuffers map[topicSubject]*topicBuffer
|
||||||
|
|
||||||
// snapCache if a cache of EventSnapshots indexed by topic and key.
|
// snapCache if a cache of EventSnapshots indexed by topic and subject.
|
||||||
// TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
|
// TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
|
||||||
snapCache map[Topic]map[string]*eventSnapshot
|
snapCache map[topicSubject]*eventSnapshot
|
||||||
|
|
||||||
subscriptions *subscriptions
|
subscriptions *subscriptions
|
||||||
|
|
||||||
|
@ -41,6 +41,13 @@ type EventPublisher struct {
|
||||||
snapshotHandlers SnapshotHandlers
|
snapshotHandlers SnapshotHandlers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// topicSubject is used as a map key when accessing topic buffers and cached
|
||||||
|
// snapshots.
|
||||||
|
type topicSubject struct {
|
||||||
|
Topic Topic
|
||||||
|
Subject Subject
|
||||||
|
}
|
||||||
|
|
||||||
type subscriptions struct {
|
type subscriptions struct {
|
||||||
// lock for byToken. If both subscription.lock and EventPublisher.lock need
|
// lock for byToken. If both subscription.lock and EventPublisher.lock need
|
||||||
// to be held, EventPublisher.lock MUST always be acquired first.
|
// to be held, EventPublisher.lock MUST always be acquired first.
|
||||||
|
@ -54,6 +61,14 @@ type subscriptions struct {
|
||||||
byToken map[string]map[*SubscribeRequest]*Subscription
|
byToken map[string]map[*SubscribeRequest]*Subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// topicBuffer augments the eventBuffer with a reference counter, enabling
|
||||||
|
// clean up of unused buffers once there are no longer any subscribers for
|
||||||
|
// the given topic and key.
|
||||||
|
type topicBuffer struct {
|
||||||
|
refs int // refs is guarded by EventPublisher.lock.
|
||||||
|
buf *eventBuffer
|
||||||
|
}
|
||||||
|
|
||||||
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
|
||||||
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
|
||||||
// The nil Topic is reserved and should not be used.
|
// The nil Topic is reserved and should not be used.
|
||||||
|
@ -79,8 +94,8 @@ type SnapshotAppender interface {
|
||||||
func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
|
func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
|
||||||
e := &EventPublisher{
|
e := &EventPublisher{
|
||||||
snapCacheTTL: snapCacheTTL,
|
snapCacheTTL: snapCacheTTL,
|
||||||
topicBuffers: make(map[Topic]*eventBuffer),
|
topicBuffers: make(map[topicSubject]*topicBuffer),
|
||||||
snapCache: make(map[Topic]map[string]*eventSnapshot),
|
snapCache: make(map[topicSubject]*eventSnapshot),
|
||||||
publishCh: make(chan []Event, 64),
|
publishCh: make(chan []Event, 64),
|
||||||
subscriptions: &subscriptions{
|
subscriptions: &subscriptions{
|
||||||
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
||||||
|
@ -116,36 +131,59 @@ func (e *EventPublisher) Run(ctx context.Context) {
|
||||||
// publishEvent appends the events to any applicable topic buffers. It handles
|
// publishEvent appends the events to any applicable topic buffers. It handles
|
||||||
// any closeSubscriptionPayload events by closing associated subscriptions.
|
// any closeSubscriptionPayload events by closing associated subscriptions.
|
||||||
func (e *EventPublisher) publishEvent(events []Event) {
|
func (e *EventPublisher) publishEvent(events []Event) {
|
||||||
eventsByTopic := make(map[Topic][]Event)
|
groupedEvents := make(map[topicSubject][]Event)
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
|
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
|
||||||
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
|
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
|
groupKey := topicSubject{event.Topic, event.Payload.Subject()}
|
||||||
|
groupedEvents[groupKey] = append(groupedEvents[groupKey], event)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.lock.Lock()
|
e.lock.Lock()
|
||||||
defer e.lock.Unlock()
|
defer e.lock.Unlock()
|
||||||
for topic, events := range eventsByTopic {
|
for groupKey, events := range groupedEvents {
|
||||||
e.getTopicBuffer(topic).Append(events)
|
// Note: bufferForPublishing returns nil if there are no subscribers for the
|
||||||
|
// given topic and subject, in which case events will be dropped on the floor and
|
||||||
|
// future subscribers will catch up by consuming the snapshot.
|
||||||
|
if buf := e.bufferForPublishing(groupKey); buf != nil {
|
||||||
|
buf.Append(events)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTopicBuffer for the topic. Creates a new event buffer if one does not
|
// bufferForSubscription returns the topic event buffer to which events for the
|
||||||
// already exist.
|
// given topic and key will be appended. If no such buffer exists, a new buffer
|
||||||
|
// will be created.
|
||||||
//
|
//
|
||||||
// EventPublisher.lock must be held to call this method.
|
// Warning: e.lock MUST be held when calling this function.
|
||||||
func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
|
func (e *EventPublisher) bufferForSubscription(key topicSubject) *topicBuffer {
|
||||||
buf, ok := e.topicBuffers[topic]
|
buf, ok := e.topicBuffers[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
buf = newEventBuffer()
|
buf = &topicBuffer{
|
||||||
e.topicBuffers[topic] = buf
|
buf: newEventBuffer(),
|
||||||
|
}
|
||||||
|
e.topicBuffers[key] = buf
|
||||||
}
|
}
|
||||||
|
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// bufferForPublishing returns the event buffer to which events for the given
|
||||||
|
// topic and key should be appended. nil will be returned if there are no
|
||||||
|
// subscribers for the given topic and key.
|
||||||
|
//
|
||||||
|
// Warning: e.lock MUST be held when calling this function.
|
||||||
|
func (e *EventPublisher) bufferForPublishing(key topicSubject) *eventBuffer {
|
||||||
|
buf, ok := e.topicBuffers[key]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return buf.buf
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe returns a new Subscription for the given request. A subscription
|
// Subscribe returns a new Subscription for the given request. A subscription
|
||||||
// will receive an initial snapshot of events matching the request if req.Index > 0.
|
// will receive an initial snapshot of events matching the request if req.Index > 0.
|
||||||
// After the snapshot, events will be streamed as they are created.
|
// After the snapshot, events will be streamed as they are created.
|
||||||
|
@ -163,7 +201,34 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
||||||
e.lock.Lock()
|
e.lock.Lock()
|
||||||
defer e.lock.Unlock()
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
topicHead := e.getTopicBuffer(req.Topic).Head()
|
topicBuf := e.bufferForSubscription(req.topicSubject())
|
||||||
|
topicBuf.refs++
|
||||||
|
|
||||||
|
// freeBuf is used to free the topic buffer once there are no remaining
|
||||||
|
// subscribers for the given topic and key.
|
||||||
|
//
|
||||||
|
// Note: it's called by Subcription.Unsubscribe which has its own side-effects
|
||||||
|
// that are made without holding e.lock (so there's a moment where the ref
|
||||||
|
// counter is inconsistent with the subscription map) — in practice this is
|
||||||
|
// fine, we don't need these things to be strongly consistent. The alternative
|
||||||
|
// would be to hold both locks, which introduces the risk of deadlocks.
|
||||||
|
freeBuf := func() {
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
|
topicBuf.refs--
|
||||||
|
|
||||||
|
if topicBuf.refs == 0 {
|
||||||
|
delete(e.topicBuffers, req.topicSubject())
|
||||||
|
|
||||||
|
// Evict cached snapshot too because the topic buffer will have been spliced
|
||||||
|
// onto it. If we don't do this, any new subscribers started before the cache
|
||||||
|
// TTL is reached will get "stuck" waiting on the old buffer.
|
||||||
|
delete(e.snapCache, req.topicSubject())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
topicHead := topicBuf.buf.Head()
|
||||||
|
|
||||||
// If the client view is fresh, resume the stream.
|
// If the client view is fresh, resume the stream.
|
||||||
if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
|
if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
|
||||||
|
@ -173,7 +238,7 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
||||||
// the subscription will receive new events.
|
// the subscription will receive new events.
|
||||||
next, _ := topicHead.NextNoBlock()
|
next, _ := topicHead.NextNoBlock()
|
||||||
buf.AppendItem(next)
|
buf.AppendItem(next)
|
||||||
return e.subscriptions.add(req, subscriptionHead), nil
|
return e.subscriptions.add(req, subscriptionHead, freeBuf), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
snapFromCache := e.getCachedSnapshotLocked(req)
|
snapFromCache := e.getCachedSnapshotLocked(req)
|
||||||
|
@ -186,7 +251,7 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
||||||
|
|
||||||
// If the request.Index is 0 the client has no view, send a full snapshot.
|
// If the request.Index is 0 the client has no view, send a full snapshot.
|
||||||
if req.Index == 0 {
|
if req.Index == 0 {
|
||||||
return e.subscriptions.add(req, snapFromCache.First), nil
|
return e.subscriptions.add(req, snapFromCache.First, freeBuf), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise the request has an Index, the client view is stale and must be reset
|
// otherwise the request has an Index, the client view is stale and must be reset
|
||||||
|
@ -197,11 +262,17 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
|
||||||
Payload: newSnapshotToFollow{},
|
Payload: newSnapshotToFollow{},
|
||||||
}})
|
}})
|
||||||
result.buffer.AppendItem(snapFromCache.First)
|
result.buffer.AppendItem(snapFromCache.First)
|
||||||
return e.subscriptions.add(req, result.First), nil
|
return e.subscriptions.add(req, result.First, freeBuf), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
|
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem, freeBuf func()) *Subscription {
|
||||||
sub := newSubscription(*req, head, s.unsubscribe(req))
|
// We wrap freeBuf in a sync.Once as it's expected that Subscription.unsub is
|
||||||
|
// idempotent, but freeBuf decrements the reference counter on every call.
|
||||||
|
var once sync.Once
|
||||||
|
sub := newSubscription(*req, head, func() {
|
||||||
|
s.unsubscribe(req)
|
||||||
|
once.Do(freeBuf)
|
||||||
|
})
|
||||||
|
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
@ -228,24 +299,17 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// unsubscribe returns a function that the subscription will call to remove
|
func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
|
||||||
// itself from the subsByToken.
|
s.lock.Lock()
|
||||||
// This function is returned as a closure so that the caller doesn't need to keep
|
defer s.lock.Unlock()
|
||||||
// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the
|
|
||||||
// wrong pointer.
|
|
||||||
func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
|
|
||||||
return func() {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
subsByToken, ok := s.byToken[req.Token]
|
subsByToken, ok := s.byToken[req.Token]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
delete(subsByToken, req)
|
delete(subsByToken, req)
|
||||||
if len(subsByToken) == 0 {
|
if len(subsByToken) == 0 {
|
||||||
delete(s.byToken, req.Token)
|
delete(s.byToken, req.Token)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,13 +326,7 @@ func (s *subscriptions) closeAll() {
|
||||||
|
|
||||||
// EventPublisher.lock must be held to call this method.
|
// EventPublisher.lock must be held to call this method.
|
||||||
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
|
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
|
||||||
topicSnaps, ok := e.snapCache[req.Topic]
|
snap, ok := e.snapCache[req.topicSubject()]
|
||||||
if !ok {
|
|
||||||
topicSnaps = make(map[string]*eventSnapshot)
|
|
||||||
e.snapCache[req.Topic] = topicSnaps
|
|
||||||
}
|
|
||||||
|
|
||||||
snap, ok := topicSnaps[snapCacheKey(req)]
|
|
||||||
if ok && snap.err() == nil {
|
if ok && snap.err() == nil {
|
||||||
return snap
|
return snap
|
||||||
}
|
}
|
||||||
|
@ -280,16 +338,12 @@ func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *ev
|
||||||
if e.snapCacheTTL == 0 {
|
if e.snapCacheTTL == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
e.snapCache[req.Topic][snapCacheKey(req)] = snap
|
e.snapCache[req.topicSubject()] = snap
|
||||||
|
|
||||||
// Setup a cache eviction
|
// Setup a cache eviction
|
||||||
time.AfterFunc(e.snapCacheTTL, func() {
|
time.AfterFunc(e.snapCacheTTL, func() {
|
||||||
e.lock.Lock()
|
e.lock.Lock()
|
||||||
defer e.lock.Unlock()
|
defer e.lock.Unlock()
|
||||||
delete(e.snapCache[req.Topic], snapCacheKey(req))
|
delete(e.snapCache, req.topicSubject())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func snapCacheKey(req *SubscribeRequest) string {
|
|
||||||
return req.Partition + "/" + req.Namespace + "/" + req.Key
|
|
||||||
}
|
|
||||||
|
|
|
@ -56,6 +56,13 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
|
||||||
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
|
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
|
||||||
}
|
}
|
||||||
require.Equal(t, expected, next)
|
require.Equal(t, expected, next)
|
||||||
|
|
||||||
|
// Subscriber should not see events for other keys
|
||||||
|
publisher.Publish([]Event{{
|
||||||
|
Topic: testTopic,
|
||||||
|
Payload: simplePayload{key: "other-key", value: "this-should-not-reach-the-subscriber"},
|
||||||
|
}})
|
||||||
|
assertNoResult(t, eventCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
var testSnapshotEvent = Event{
|
var testSnapshotEvent = Event{
|
||||||
|
@ -70,17 +77,12 @@ type simplePayload struct {
|
||||||
noReadPerm bool
|
noReadPerm bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p simplePayload) MatchesKey(key, _, _ string) bool {
|
|
||||||
if key == "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return p.key == key
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p simplePayload) HasReadPermission(acl.Authorizer) bool {
|
func (p simplePayload) HasReadPermission(acl.Authorizer) bool {
|
||||||
return !p.noReadPerm
|
return !p.noReadPerm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p simplePayload) Subject() Subject { return Subject("default/default/" + p.key) }
|
||||||
|
|
||||||
func newTestSnapshotHandlers() SnapshotHandlers {
|
func newTestSnapshotHandlers() SnapshotHandlers {
|
||||||
return SnapshotHandlers{
|
return SnapshotHandlers{
|
||||||
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||||
|
@ -190,9 +192,10 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
|
||||||
|
|
||||||
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
||||||
go publisher.Run(ctx)
|
go publisher.Run(ctx)
|
||||||
|
|
||||||
sub, err := publisher.Subscribe(req)
|
sub, err := publisher.Subscribe(req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
|
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
|
||||||
return 0, fmt.Errorf("error should not be seen, cache should have been used")
|
return 0, fmt.Errorf("error should not be seen, cache should have been used")
|
||||||
|
@ -200,6 +203,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
|
||||||
|
|
||||||
sub, err = publisher.Subscribe(req)
|
sub, err = publisher.Subscribe(req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
eventCh := runSubscription(ctx, sub)
|
eventCh := runSubscription(ctx, sub)
|
||||||
next := getNextEvent(t, eventCh)
|
next := getNextEvent(t, eventCh)
|
||||||
|
@ -233,7 +237,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
|
||||||
|
|
||||||
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
||||||
go publisher.Run(ctx)
|
go publisher.Run(ctx)
|
||||||
// Include the same event in the topicBuffer
|
|
||||||
|
simulateExistingSubscriber(t, publisher, req)
|
||||||
|
|
||||||
|
// Publish the testSnapshotEvent, to ensure that it is skipped over when
|
||||||
|
// splicing the topic buffer onto the snapshot.
|
||||||
publisher.publishEvent([]Event{testSnapshotEvent})
|
publisher.publishEvent([]Event{testSnapshotEvent})
|
||||||
|
|
||||||
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
||||||
|
@ -338,7 +346,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
|
||||||
|
|
||||||
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
||||||
go publisher.Run(ctx)
|
go publisher.Run(ctx)
|
||||||
// Include the same event in the topicBuffer
|
|
||||||
|
simulateExistingSubscriber(t, publisher, req)
|
||||||
|
|
||||||
|
// Publish the testSnapshotEvent, to ensure that it is skipped over when
|
||||||
|
// splicing the topic buffer onto the snapshot.
|
||||||
publisher.publishEvent([]Event{testSnapshotEvent})
|
publisher.publishEvent([]Event{testSnapshotEvent})
|
||||||
|
|
||||||
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
runStep(t, "start a subscription and unsub", func(t *testing.T) {
|
||||||
|
@ -421,7 +433,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi
|
||||||
|
|
||||||
publisher := NewEventPublisher(handlers, time.Second)
|
publisher := NewEventPublisher(handlers, time.Second)
|
||||||
go publisher.Run(ctx)
|
go publisher.Run(ctx)
|
||||||
// Include the same events in the topicBuffer
|
|
||||||
|
simulateExistingSubscriber(t, publisher, req)
|
||||||
|
|
||||||
|
// Publish the events, to ensure they are is skipped over when splicing the
|
||||||
|
// topic buffer onto the snapshot.
|
||||||
publisher.publishEvent([]Event{testSnapshotEvent})
|
publisher.publishEvent([]Event{testSnapshotEvent})
|
||||||
publisher.publishEvent([]Event{nextEvent})
|
publisher.publishEvent([]Event{nextEvent})
|
||||||
|
|
||||||
|
@ -495,3 +511,60 @@ func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "subscription was closed by unsubscribe")
|
require.Contains(t, err.Error(), "subscription was closed by unsubscribe")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) {
|
||||||
|
req := &SubscribeRequest{
|
||||||
|
Topic: testTopic,
|
||||||
|
Key: "sub-key",
|
||||||
|
}
|
||||||
|
|
||||||
|
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
|
||||||
|
|
||||||
|
sub1, err := publisher.Subscribe(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Expect a topic buffer and snapshot to have been created.
|
||||||
|
publisher.lock.Lock()
|
||||||
|
require.NotNil(t, publisher.topicBuffers[req.topicSubject()])
|
||||||
|
require.NotNil(t, publisher.snapCache[req.topicSubject()])
|
||||||
|
publisher.lock.Unlock()
|
||||||
|
|
||||||
|
// Create another subscription and close the old one, to ensure the buffer and
|
||||||
|
// snapshot stick around as long as there's at least one subscriber.
|
||||||
|
sub2, err := publisher.Subscribe(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sub1.Unsubscribe()
|
||||||
|
|
||||||
|
publisher.lock.Lock()
|
||||||
|
require.NotNil(t, publisher.topicBuffers[req.topicSubject()])
|
||||||
|
require.NotNil(t, publisher.snapCache[req.topicSubject()])
|
||||||
|
publisher.lock.Unlock()
|
||||||
|
|
||||||
|
// Close the other subscription and expect the buffer and snapshot to have
|
||||||
|
// been cleaned up.
|
||||||
|
sub2.Unsubscribe()
|
||||||
|
|
||||||
|
publisher.lock.Lock()
|
||||||
|
require.Nil(t, publisher.topicBuffers[req.topicSubject()])
|
||||||
|
require.Nil(t, publisher.snapCache[req.topicSubject()])
|
||||||
|
publisher.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// simulateExistingSubscriber creates a subscription that remains open throughout
|
||||||
|
// a test to prevent the topic buffer getting garbage-collected.
|
||||||
|
//
|
||||||
|
// It evicts the created snapshot from the cache immediately (simulating an
|
||||||
|
// existing subscription that has been open long enough the snapshot's TTL has
|
||||||
|
// been reached) so you can test snapshots getting created afresh.
|
||||||
|
func simulateExistingSubscriber(t *testing.T, p *EventPublisher, r *SubscribeRequest) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
sub, err := p.Subscribe(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(sub.Unsubscribe)
|
||||||
|
|
||||||
|
p.lock.Lock()
|
||||||
|
delete(p.snapCache, r.topicSubject())
|
||||||
|
p.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
|
@ -20,119 +20,6 @@ func newSimpleEvent(key string, index uint64) Event {
|
||||||
return Event{Index: index, Payload: simplePayload{key: key}}
|
return Event{Index: index, Payload: simplePayload{key: key}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPayloadEvents_FilterByKey(t *testing.T) {
|
|
||||||
type testCase struct {
|
|
||||||
name string
|
|
||||||
req SubscribeRequest
|
|
||||||
events []Event
|
|
||||||
expectEvent bool
|
|
||||||
expected *PayloadEvents
|
|
||||||
expectedCap int
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(t *testing.T, tc testCase) {
|
|
||||||
events := make([]Event, 0, 5)
|
|
||||||
events = append(events, tc.events...)
|
|
||||||
|
|
||||||
pe := &PayloadEvents{Items: events}
|
|
||||||
ok := pe.MatchesKey(tc.req.Key, tc.req.Namespace, tc.req.Partition)
|
|
||||||
require.Equal(t, tc.expectEvent, ok)
|
|
||||||
if !tc.expectEvent {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, tc.expected, pe)
|
|
||||||
// test if there was a new array allocated or not
|
|
||||||
require.Equal(t, tc.expectedCap, cap(pe.Items))
|
|
||||||
}
|
|
||||||
|
|
||||||
var testCases = []testCase{
|
|
||||||
{
|
|
||||||
name: "all events match, no key or namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("One", 102),
|
|
||||||
newSimpleEvent("Two", 102)},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: newPayloadEvents(
|
|
||||||
newSimpleEvent("One", 102),
|
|
||||||
newSimpleEvent("Two", 102)),
|
|
||||||
expectedCap: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "all events match, no namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("Same", 103),
|
|
||||||
newSimpleEvent("Same", 103)},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: newPayloadEvents(
|
|
||||||
newSimpleEvent("Same", 103),
|
|
||||||
newSimpleEvent("Same", 103)),
|
|
||||||
expectedCap: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "all events match, no key",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
||||||
events: []Event{
|
|
||||||
newNSEvent("Something", "apps"),
|
|
||||||
newNSEvent("Other", "apps")},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: newPayloadEvents(
|
|
||||||
newNSEvent("Something", "apps"),
|
|
||||||
newNSEvent("Other", "apps")),
|
|
||||||
expectedCap: 5,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "some evens match, no namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("Same", 104),
|
|
||||||
newSimpleEvent("Other", 104),
|
|
||||||
newSimpleEvent("Same", 104)},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: newPayloadEvents(
|
|
||||||
newSimpleEvent("Same", 104),
|
|
||||||
newSimpleEvent("Same", 104)),
|
|
||||||
expectedCap: 2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "some events match, no key",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
||||||
events: []Event{
|
|
||||||
newNSEvent("app1", "apps"),
|
|
||||||
newNSEvent("db1", "dbs"),
|
|
||||||
newNSEvent("app2", "apps")},
|
|
||||||
expectEvent: true,
|
|
||||||
expected: newPayloadEvents(
|
|
||||||
newNSEvent("app1", "apps"),
|
|
||||||
newNSEvent("app2", "apps")),
|
|
||||||
expectedCap: 2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "no events match key",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
|
|
||||||
events: []Event{
|
|
||||||
newSimpleEvent("Same", 0),
|
|
||||||
newSimpleEvent("Same", 0)},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "no events match namespace",
|
|
||||||
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
|
|
||||||
events: []Event{
|
|
||||||
newNSEvent("app1", "group1"),
|
|
||||||
newNSEvent("app2", "group2")},
|
|
||||||
expectEvent: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
fn(t, tc)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(partitions)
|
// TODO(partitions)
|
||||||
func newNSEvent(key, namespace string) Event {
|
func newNSEvent(key, namespace string) Event {
|
||||||
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
|
||||||
|
@ -146,12 +33,6 @@ type nsPayload struct {
|
||||||
value string
|
value string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p nsPayload) MatchesKey(key, namespace, partition string) bool {
|
|
||||||
return (key == "" || key == p.key) &&
|
|
||||||
(namespace == "" || namespace == p.namespace) &&
|
|
||||||
(partition == "" || partition == p.partition)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPayloadEvents_HasReadPermission(t *testing.T) {
|
func TestPayloadEvents_HasReadPermission(t *testing.T) {
|
||||||
t.Run("some events filtered", func(t *testing.T) {
|
t.Run("some events filtered", func(t *testing.T) {
|
||||||
ep := newPayloadEvents(
|
ep := newPayloadEvents(
|
||||||
|
|
|
@ -4,7 +4,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -59,12 +62,9 @@ type SubscribeRequest struct {
|
||||||
// be returned by the subscription. A blank key will return all events. Key
|
// be returned by the subscription. A blank key will return all events. Key
|
||||||
// is generally the name of the resource.
|
// is generally the name of the resource.
|
||||||
Key string
|
Key string
|
||||||
// Namespace used to filter events in the topic. Only events matching the
|
// EnterpriseMeta is used to filter events in the topic. Only events matching
|
||||||
// namespace will be returned by the subscription.
|
// the partition and namespace will be returned by the subscription.
|
||||||
Namespace string
|
EnterpriseMeta structs.EnterpriseMeta
|
||||||
// Partition used to filter events in the topic. Only events matching the
|
|
||||||
// partition will be returned by the subscription.
|
|
||||||
Partition string // TODO(partitions): make this work
|
|
||||||
// Token that was used to authenticate the request. If any ACL policy
|
// Token that was used to authenticate the request. If any ACL policy
|
||||||
// changes impact the token the subscription will be forcefully closed.
|
// changes impact the token the subscription will be forcefully closed.
|
||||||
Token string
|
Token string
|
||||||
|
@ -74,6 +74,19 @@ type SubscribeRequest struct {
|
||||||
Index uint64
|
Index uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (req SubscribeRequest) Subject() Subject {
|
||||||
|
var (
|
||||||
|
partition = req.EnterpriseMeta.PartitionOrDefault()
|
||||||
|
namespace = req.EnterpriseMeta.NamespaceOrDefault()
|
||||||
|
key = strings.ToLower(req.Key)
|
||||||
|
)
|
||||||
|
return Subject(partition + "/" + namespace + "/" + key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (req SubscribeRequest) topicSubject() topicSubject {
|
||||||
|
return topicSubject{req.Topic, req.Subject()}
|
||||||
|
}
|
||||||
|
|
||||||
// newSubscription return a new subscription. The caller is responsible for
|
// newSubscription return a new subscription. The caller is responsible for
|
||||||
// calling Unsubscribe when it is done with the subscription, to free resources.
|
// calling Unsubscribe when it is done with the subscription, to free resources.
|
||||||
func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
|
func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
|
||||||
|
@ -104,11 +117,7 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) {
|
||||||
if len(next.Events) == 0 {
|
if len(next.Events) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
event := newEventFromBatch(s.req, next.Events)
|
return newEventFromBatch(s.req, next.Events), nil
|
||||||
if !event.Payload.MatchesKey(s.req.Key, s.req.Namespace, s.req.Partition) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return event, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,10 +6,32 @@ import (
|
||||||
time "time"
|
time "time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func noopUnSub() {}
|
func noopUnSub() {}
|
||||||
|
|
||||||
|
func TestSubscription_Subject(t *testing.T) {
|
||||||
|
for desc, tc := range map[string]struct {
|
||||||
|
req SubscribeRequest
|
||||||
|
sub Subject
|
||||||
|
}{
|
||||||
|
"default partition and namespace": {
|
||||||
|
SubscribeRequest{Key: "foo", EnterpriseMeta: structs.EnterpriseMeta{}},
|
||||||
|
"default/default/foo",
|
||||||
|
},
|
||||||
|
"mixed casing": {
|
||||||
|
SubscribeRequest{Key: "BaZ"},
|
||||||
|
"default/default/baz",
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(desc, func(t *testing.T) {
|
||||||
|
require.Equal(t, tc.sub, tc.req.Subject())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSubscription(t *testing.T) {
|
func TestSubscription(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
@ -59,10 +81,6 @@ func TestSubscription(t *testing.T) {
|
||||||
"Event should have been delivered after short time, took %s", elapsed)
|
"Event should have been delivered after short time, took %s", elapsed)
|
||||||
require.Equal(t, index, got.Index)
|
require.Equal(t, index, got.Index)
|
||||||
|
|
||||||
// Event with wrong key should not be delivered. Deliver a good message right
|
|
||||||
// so we don't have to block test thread forever or cancel func yet.
|
|
||||||
index++
|
|
||||||
publishTestEvent(index, eb, "nope")
|
|
||||||
index++
|
index++
|
||||||
publishTestEvent(index, eb, "test")
|
publishTestEvent(index, eb, "test")
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,10 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.Key == "" {
|
||||||
|
return status.Error(codes.InvalidArgument, "Key is required")
|
||||||
|
}
|
||||||
|
|
||||||
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta))
|
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -89,12 +93,11 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
|
||||||
|
|
||||||
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest {
|
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest {
|
||||||
return &stream.SubscribeRequest{
|
return &stream.SubscribeRequest{
|
||||||
Topic: req.Topic,
|
Topic: req.Topic,
|
||||||
Key: req.Key,
|
Key: req.Key,
|
||||||
Token: req.Token,
|
EnterpriseMeta: entMeta,
|
||||||
Index: req.Index,
|
Token: req.Token,
|
||||||
Namespace: entMeta.NamespaceOrEmpty(),
|
Index: req.Index,
|
||||||
Partition: entMeta.PartitionOrEmpty(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,33 @@ import (
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestServer_Subscribe_KeyIsRequired(t *testing.T) {
|
||||||
|
backend, err := newTestBackend()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(logError(t, conn.Close))
|
||||||
|
|
||||||
|
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||||
|
|
||||||
|
stream, err := client.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||||
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
|
Key: "",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = stream.Recv()
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
|
||||||
|
require.Contains(t, err.Error(), "Key is required")
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
||||||
backend, err := newTestBackend()
|
backend, err := newTestBackend()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -878,6 +905,8 @@ func assertNoEvents(t *testing.T, chEvents chan eventOrError) {
|
||||||
|
|
||||||
func logError(t *testing.T, f func() error) func() {
|
func logError(t *testing.T, f func() error) func() {
|
||||||
return func() {
|
return func() {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
if err := f(); err != nil {
|
if err := f(); err != nil {
|
||||||
t.Logf(err.Error())
|
t.Logf(err.Error())
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,9 +90,8 @@ type SubscribeRequest struct {
|
||||||
Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"`
|
Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"`
|
||||||
// Key is a topic-specific identifier that restricts the scope of the
|
// Key is a topic-specific identifier that restricts the scope of the
|
||||||
// subscription to only events pertaining to that identifier. For example,
|
// subscription to only events pertaining to that identifier. For example,
|
||||||
// to receive events for a single service, the service's name is
|
// to receive events for a single service, the service's name is specified
|
||||||
// specified as the key. An empty key indicates that all events in the topic
|
// as the key.
|
||||||
// are of interest.
|
|
||||||
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
|
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
|
||||||
// Token is the ACL token to authenticate the request. The token must have
|
// Token is the ACL token to authenticate the request. The token must have
|
||||||
// sufficient privileges to read the requested information otherwise events
|
// sufficient privileges to read the requested information otherwise events
|
||||||
|
|
|
@ -51,9 +51,8 @@ message SubscribeRequest {
|
||||||
|
|
||||||
// Key is a topic-specific identifier that restricts the scope of the
|
// Key is a topic-specific identifier that restricts the scope of the
|
||||||
// subscription to only events pertaining to that identifier. For example,
|
// subscription to only events pertaining to that identifier. For example,
|
||||||
// to receive events for a single service, the service's name is
|
// to receive events for a single service, the service's name is specified
|
||||||
// specified as the key. An empty key indicates that all events in the topic
|
// as the key.
|
||||||
// are of interest.
|
|
||||||
string Key = 2;
|
string Key = 2;
|
||||||
|
|
||||||
// Token is the ACL token to authenticate the request. The token must have
|
// Token is the ACL token to authenticate the request. The token must have
|
||||||
|
|
Loading…
Reference in New Issue