From dcacfd35488dba17947069f970c28d4938d71178 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 5 Nov 2020 16:49:59 -0500 Subject: [PATCH 1/5] stream: Remove unused method --- agent/consul/stream/event.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 09d96ee6d..3621f2047 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -82,10 +82,6 @@ func (e PayloadEvents) FilterByKey(_, _ string) bool { return true } -func (e PayloadEvents) Events() []Event { - return e -} - // IsEndOfSnapshot returns true if this is a framing event that indicates the // snapshot has completed. Subsequent events from Subscription.Next will be // streamed as they occur. From 8a26bca020fd5fcbe47db810dddc8bb7ee761f67 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 5 Nov 2020 17:50:17 -0500 Subject: [PATCH 2/5] stream: move event filtering to PayloadEvents Removes the weirdness around PayloadEvents.FilterByKey --- agent/consul/stream/event.go | 59 ++++---- agent/consul/stream/event_test.go | 131 ++++++++++++++++++ agent/consul/stream/subscription.go | 18 +-- agent/consul/stream/subscription_test.go | 166 ++++------------------- 4 files changed, 184 insertions(+), 190 deletions(-) diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 3621f2047..09369920b 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -24,62 +24,55 @@ type Payload interface { // Generally this means that the payload matches the key and namespace or // the payload is a special framing event that should be returned to every // subscription. + // TODO: rename to MatchesKey FilterByKey(key, namespace string) bool } -// Len returns the number of events contained within this event. If the Payload -// is a []Event, the length of that slice is returned. Otherwise 1 is returned. -func (e Event) Len() int { - if batch, ok := e.Payload.(PayloadEvents); ok { - return len(batch) - } - return 1 +// PayloadEvents is an Payload which contains multiple Events. +type PayloadEvents struct { + Items []Event } -// Filter returns an Event filtered to only those Events where f returns true. -// If the second return value is false, every Event was removed by the filter. -func (e Event) Filter(f func(Event) bool) (Event, bool) { - batch, ok := e.Payload.(PayloadEvents) - if !ok { - return e, f(e) - } +func NewPayloadEvents(items ...Event) *PayloadEvents { + return &PayloadEvents{Items: items} +} + +func (p *PayloadEvents) filter(f func(Event) bool) bool { + items := p.Items // To avoid extra allocations, iterate over the list of events first and // get a count of the total desired size. This trades off some extra cpu // time in the worse case (when not all items match the filter), for // fewer memory allocations. var size int - for idx := range batch { - if f(batch[idx]) { + for idx := range items { + if f(items[idx]) { size++ } } - if len(batch) == size || size == 0 { - return e, size != 0 + if len(items) == size || size == 0 { + return size != 0 } - filtered := make(PayloadEvents, 0, size) - for idx := range batch { - event := batch[idx] + filtered := make([]Event, 0, size) + for idx := range items { + event := items[idx] if f(event) { filtered = append(filtered, event) } } - if len(filtered) == 0 { - return e, false - } - e.Payload = filtered - return e, true + p.Items = filtered + return true } -// PayloadEvents is an Payload which contains multiple Events. -type PayloadEvents []Event +func (p *PayloadEvents) FilterByKey(key, namespace string) bool { + return p.filter(func(event Event) bool { + return event.Payload.FilterByKey(key, namespace) + }) +} -// TODO: this method is not called, but needs to exist so that we can store -// a slice of events as a payload. In the future we should be able to refactor -// Event.Filter so that this FilterByKey includes the re-slicing. -func (e PayloadEvents) FilterByKey(_, _ string) bool { - return true +func (p *PayloadEvents) Len() int { + return len(p.Items) } // IsEndOfSnapshot returns true if this is a framing event that indicates the diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index 182f0d512..f3daba802 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -15,3 +15,134 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) { require.False(t, e.IsEndOfSnapshot()) }) } + +func newSimpleEvent(key string, index uint64) Event { + return Event{Index: index, Payload: simplePayload{key: key}} +} + +func TestPayloadEvents_FilterByKey(t *testing.T) { + type testCase struct { + name string + req SubscribeRequest + events []Event + expectEvent bool + expected *PayloadEvents + expectedCap int + } + + fn := func(t *testing.T, tc testCase) { + events := make([]Event, 0, 5) + events = append(events, tc.events...) + + pe := &PayloadEvents{Items: events} + ok := pe.FilterByKey(tc.req.Key, tc.req.Namespace) + require.Equal(t, tc.expectEvent, ok) + if !tc.expectEvent { + return + } + + require.Equal(t, tc.expected, pe) + // test if there was a new array allocated or not + require.Equal(t, tc.expectedCap, cap(pe.Items)) + } + + var testCases = []testCase{ + { + name: "all events match, no key or namespace", + req: SubscribeRequest{Topic: testTopic}, + events: []Event{ + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)}, + expectEvent: true, + expected: NewPayloadEvents( + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)), + expectedCap: 5, + }, + { + name: "all events match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)}, + expectEvent: true, + expected: NewPayloadEvents( + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)), + expectedCap: 5, + }, + { + name: "all events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")}, + expectEvent: true, + expected: NewPayloadEvents( + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")), + expectedCap: 5, + }, + { + name: "some evens match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 104), + newSimpleEvent("Other", 104), + newSimpleEvent("Same", 104)}, + expectEvent: true, + expected: NewPayloadEvents( + newSimpleEvent("Same", 104), + newSimpleEvent("Same", 104)), + expectedCap: 2, + }, + { + name: "some events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "apps"), + newNSEvent("db1", "dbs"), + newNSEvent("app2", "apps")}, + expectEvent: true, + expected: NewPayloadEvents( + newNSEvent("app1", "apps"), + newNSEvent("app2", "apps")), + expectedCap: 2, + }, + { + name: "no events match key", + req: SubscribeRequest{Topic: testTopic, Key: "Other"}, + events: []Event{ + newSimpleEvent("Same", 0), + newSimpleEvent("Same", 0)}, + }, + { + name: "no events match namespace", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "group1"), + newNSEvent("app2", "group2")}, + expectEvent: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} + +func newNSEvent(key, namespace string) Event { + return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} +} + +type nsPayload struct { + key string + namespace string + value string +} + +func (p nsPayload) FilterByKey(key, namespace string) bool { + return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) +} diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 472b0ce90..5dc45efdf 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -101,8 +101,8 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) { if len(next.Events) == 0 { continue } - event, ok := filterByKey(s.req, next.Events) - if !ok { + event := newEventFromBatch(s.req, next.Events) + if !event.Payload.FilterByKey(s.req.Key, s.req.Namespace) { continue } return event, nil @@ -128,22 +128,10 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event { return Event{ Topic: req.Topic, Index: first.Index, - Payload: PayloadEvents(events), + Payload: NewPayloadEvents(events...), } } -func filterByKey(req SubscribeRequest, events []Event) (Event, bool) { - event := newEventFromBatch(req, events) - if req.Key == "" && req.Namespace == "" { - return event, true - } - - fn := func(e Event) bool { - return e.Payload.FilterByKey(req.Key, req.Namespace) - } - return event.Filter(fn) -} - // Close the subscription. Subscribers will receive an error when they call Next, // and will need to perform a new Subscribe request. // It is safe to call from any goroutine. diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 2c192b184..bec2538cb 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -138,147 +138,29 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) { b.Append([]Event{e}) } -func newSimpleEvent(key string, index uint64) Event { - return Event{Index: index, Payload: simplePayload{key: key}} -} - -func TestFilterByKey(t *testing.T) { - type testCase struct { - name string - req SubscribeRequest - events []Event - expectEvent bool - expected Event - expectedCap int - } - - fn := func(t *testing.T, tc testCase) { - events := make(PayloadEvents, 0, 5) - events = append(events, tc.events...) - - actual, ok := filterByKey(tc.req, events) - require.Equal(t, tc.expectEvent, ok) - if !tc.expectEvent { - return +func TestNewEventsFromBatch(t *testing.T) { + t.Run("single item", func(t *testing.T) { + first := Event{ + Topic: testTopic, + Index: 1234, + Payload: simplePayload{key: "key"}, } - - require.Equal(t, tc.expected, actual) - // test if there was a new array allocated or not - require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents))) - } - - var testCases = []testCase{ - { - name: "all events match, no key or namespace", - req: SubscribeRequest{Topic: testTopic}, - events: []Event{ - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 102, - Payload: PayloadEvents{ - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)}}, - expectedCap: 5, - }, - { - name: "all events match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 103, - Payload: PayloadEvents{ - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)}}, - expectedCap: 5, - }, - { - name: "all events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 22, - Payload: PayloadEvents{ - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")}}, - expectedCap: 5, - }, - { - name: "some evens match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 104), - newSimpleEvent("Other", 104), - newSimpleEvent("Same", 104)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 104, - Payload: PayloadEvents{ - newSimpleEvent("Same", 104), - newSimpleEvent("Same", 104)}}, - expectedCap: 2, - }, - { - name: "some events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "apps"), - newNSEvent("db1", "dbs"), - newNSEvent("app2", "apps")}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 22, - Payload: PayloadEvents{ - newNSEvent("app1", "apps"), - newNSEvent("app2", "apps")}}, - expectedCap: 2, - }, - { - name: "no events match key", - req: SubscribeRequest{Topic: testTopic, Key: "Other"}, - events: []Event{ - newSimpleEvent("Same", 0), - newSimpleEvent("Same", 0)}, - }, - { - name: "no events match namespace", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "group1"), - newNSEvent("app2", "group2")}, - expectEvent: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - fn(t, tc) - }) - } -} - -func newNSEvent(key, namespace string) Event { - return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} -} - -type nsPayload struct { - key string - namespace string - value string -} - -func (p nsPayload) FilterByKey(key, namespace string) bool { - return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) + e := newEventFromBatch(SubscribeRequest{}, []Event{first}) + require.Equal(t, first, e) + }) + t.Run("many items", func(t *testing.T) { + events := []Event{ + newSimpleEvent("foo", 9999), + newSimpleEvent("foo", 9999), + newSimpleEvent("zee", 9999), + } + req := SubscribeRequest{Topic: testTopic} + e := newEventFromBatch(req, events) + expected := Event{ + Topic: testTopic, + Index: 9999, + Payload: NewPayloadEvents(events...), + } + require.Equal(t, expected, e) + }) } From d4cd2fa6a8732007501b1dc95b4dff4c28e23b48 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 5 Nov 2020 17:57:25 -0500 Subject: [PATCH 3/5] stream: Add HasReadPermission to Payload Required now that filter is a method on PayloadEvents instead of Event --- agent/consul/state/catalog_events.go | 6 +++ agent/consul/state/store_integration_test.go | 4 ++ agent/consul/stream/event.go | 39 ++++++++++++++++---- agent/consul/stream/event_publisher_test.go | 11 +++++- agent/consul/stream/event_test.go | 29 +++++++++++++++ agent/rpc/subscribe/auth.go | 22 ----------- agent/rpc/subscribe/logger.go | 14 +++++-- agent/rpc/subscribe/subscribe.go | 10 ++--- agent/rpc/subscribe/subscribe_test.go | 9 ++--- 9 files changed, 99 insertions(+), 45 deletions(-) delete mode 100644 agent/rpc/subscribe/auth.go diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 0a11a9436..2a410d299 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -3,6 +3,8 @@ package state import ( memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbsubscribe" @@ -19,6 +21,10 @@ type EventPayloadCheckServiceNode struct { key string } +func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool { + return e.Value.CanRead(authz) == acl.Allow +} + func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { if key == "" && namespace == "" { return true diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index d75512195..520913807 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -414,6 +414,10 @@ func (p nodePayload) FilterByKey(key, _ string) bool { return p.key == key } +func (p nodePayload) HasReadPermission(acl.Authorizer) bool { + return true +} + func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 09369920b..16a33fca7 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -4,7 +4,11 @@ to the state store. */ package stream -import "fmt" +import ( + "fmt" + + "github.com/hashicorp/consul/acl" +) // Topic is an identifier that partitions events. A subscription will only receive // events which match the Topic. @@ -26,6 +30,11 @@ type Payload interface { // subscription. // TODO: rename to MatchesKey FilterByKey(key, namespace string) bool + + // HasReadPermission uses the acl.Authorizer to determine if the items in the + // Payload are visible to the request. It returns true if the payload is + // authorized for Read, otherwise returns false. + HasReadPermission(authz acl.Authorizer) bool } // PayloadEvents is an Payload which contains multiple Events. @@ -75,6 +84,12 @@ func (p *PayloadEvents) Len() int { return len(p.Items) } +func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool { + return p.filter(func(event Event) bool { + return event.Payload.HasReadPermission(authz) + }) +} + // IsEndOfSnapshot returns true if this is a framing event that indicates the // snapshot has completed. Subsequent events from Subscription.Next will be // streamed as they occur. @@ -89,24 +104,34 @@ func (e Event) IsNewSnapshotToFollow() bool { return e.Payload == newSnapshotToFollow{} } -type endOfSnapshot struct{} +type framingEvent struct{} -func (endOfSnapshot) FilterByKey(string, string) bool { +func (framingEvent) FilterByKey(string, string) bool { return true } -type newSnapshotToFollow struct{} - -func (newSnapshotToFollow) FilterByKey(string, string) bool { +func (framingEvent) HasReadPermission(acl.Authorizer) bool { return true } +type endOfSnapshot struct { + framingEvent +} + +type newSnapshotToFollow struct { + framingEvent +} + type closeSubscriptionPayload struct { tokensSecretIDs []string } func (closeSubscriptionPayload) FilterByKey(string, string) bool { - return true + return false +} + +func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool { + return false } // NewCloseSubscriptionEvent returns a special Event that is handled by the diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index f2a9e43a3..c9f91fd3d 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" ) type intTopic int @@ -63,8 +65,9 @@ var testSnapshotEvent = Event{ } type simplePayload struct { - key string - value string + key string + value string + noReadPerm bool } func (p simplePayload) FilterByKey(key, _ string) bool { @@ -74,6 +77,10 @@ func (p simplePayload) FilterByKey(key, _ string) bool { return p.key == key } +func (p simplePayload) HasReadPermission(acl.Authorizer) bool { + return !p.noReadPerm +} + func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index f3daba802..5a9cccea8 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -138,6 +138,7 @@ func newNSEvent(key, namespace string) Event { } type nsPayload struct { + framingEvent key string namespace string value string @@ -146,3 +147,31 @@ type nsPayload struct { func (p nsPayload) FilterByKey(key, namespace string) bool { return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) } + +func TestPayloadEvents_HasReadPermission(t *testing.T) { + t.Run("some events filtered", func(t *testing.T) { + ep := NewPayloadEvents( + Event{Payload: simplePayload{key: "one", noReadPerm: true}}, + Event{Payload: simplePayload{key: "two", noReadPerm: false}}, + Event{Payload: simplePayload{key: "three", noReadPerm: true}}, + Event{Payload: simplePayload{key: "four", noReadPerm: false}}) + + require.True(t, ep.HasReadPermission(nil)) + expected := []Event{ + {Payload: simplePayload{key: "two"}}, + {Payload: simplePayload{key: "four"}}, + } + require.Equal(t, expected, ep.Items) + }) + + t.Run("all events filtered", func(t *testing.T) { + ep := NewPayloadEvents( + Event{Payload: simplePayload{key: "one", noReadPerm: true}}, + Event{Payload: simplePayload{key: "two", noReadPerm: true}}, + Event{Payload: simplePayload{key: "three", noReadPerm: true}}, + Event{Payload: simplePayload{key: "four", noReadPerm: true}}) + + require.False(t, ep.HasReadPermission(nil)) + }) + +} diff --git a/agent/rpc/subscribe/auth.go b/agent/rpc/subscribe/auth.go deleted file mode 100644 index b41b1fdc4..000000000 --- a/agent/rpc/subscribe/auth.go +++ /dev/null @@ -1,22 +0,0 @@ -package subscribe - -import ( - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/consul/stream" -) - -// EnforceACL takes an acl.Authorizer and returns the decision for whether the -// event is allowed to be sent to this client or not. -func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision { - switch { - case e.IsEndOfSnapshot(), e.IsNewSnapshotToFollow(): - return acl.Allow - } - - switch p := e.Payload.(type) { - case state.EventPayloadCheckServiceNode: - return p.Value.CanRead(authz) - } - return acl.Deny -} diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index ddddb20ca..99394f546 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -58,12 +58,20 @@ func (l *eventLogger) Trace(e stream.Event) { case e.IsEndOfSnapshot(): l.snapshotDone = true l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count) + return case e.IsNewSnapshotToFollow(): l.logger.Trace("starting new snapshot", "sent", l.count) return - case l.snapshotDone: - l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", e.Len()) } - l.count += uint64(e.Len()) + size := 1 + if l, ok := e.Payload.(length); ok { + size = l.Len() + } + l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", size) + l.count += uint64(size) +} + +type length interface { + Len() int } diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 71919baba..0e98893f9 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -132,10 +132,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) if authz == nil { return event, true } - fn := func(e stream.Event) bool { - return enforceACL(authz, e) == acl.Allow - } - return event.Filter(fn) + + return event, event.Payload.HasReadPermission(authz) } func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { @@ -154,10 +152,10 @@ func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { func setPayload(e *pbsubscribe.Event, payload stream.Payload) { switch p := payload.(type) { - case stream.PayloadEvents: + case *stream.PayloadEvents: e.Payload = &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{ - Events: batchEventsFromEventSlice(p), + Events: batchEventsFromEventSlice(p.Items), }, } case state.EventPayloadCheckServiceNode: diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index bc41ed1e8..fe3a07325 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -917,8 +917,8 @@ func TestNewEventFromSteamEvent(t *testing.T) { name: "event batch", event: stream.Event{ Index: 2002, - Payload: stream.PayloadEvents{ - { + Payload: stream.NewPayloadEvents( + stream.Event{ Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -928,7 +928,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, }, - { + stream.Event{ Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, @@ -937,8 +937,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { Service: &structs.NodeService{Service: "web1"}, }, }, - }, - }, + }), }, expected: pbsubscribe.Event{ Index: 2002, From 4fc073b1f47f6de844b67fea587b972a23b22057 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 5 Nov 2020 19:18:37 -0500 Subject: [PATCH 4/5] stream: rename FilterByKey --- agent/consul/state/catalog_events.go | 3 +-- agent/consul/state/catalog_events_test.go | 2 +- agent/consul/state/store_integration_test.go | 2 +- agent/consul/stream/event.go | 13 ++++++------- agent/consul/stream/event_publisher_test.go | 2 +- agent/consul/stream/event_test.go | 4 ++-- agent/consul/stream/subscription.go | 2 +- 7 files changed, 13 insertions(+), 15 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 2a410d299..e9968eb64 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -4,7 +4,6 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbsubscribe" @@ -25,7 +24,7 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo return e.Value.CanRead(authz) == acl.Allow } -func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { +func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool { if key == "" && namespace == "" { return true } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 1efead52a..fb2818c58 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1476,7 +1476,7 @@ func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) { t.Skip("cant test namespace matching without namespace support") } - require.Equal(t, tc.expected, tc.payload.FilterByKey(tc.key, tc.namespace)) + require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace)) } var testCases = []testCase{ diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 520913807..fc4d05591 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -410,7 +410,7 @@ type nodePayload struct { node *structs.ServiceNode } -func (p nodePayload) FilterByKey(key, _ string) bool { +func (p nodePayload) MatchesKey(key, _ string) bool { return p.key == key } diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 16a33fca7..1ee316d09 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -23,13 +23,12 @@ type Event struct { } type Payload interface { - // FilterByKey must return true if the Payload should be included in a subscription + // MatchesKey must return true if the Payload should be included in a subscription // requested with the key and namespace. // Generally this means that the payload matches the key and namespace or // the payload is a special framing event that should be returned to every // subscription. - // TODO: rename to MatchesKey - FilterByKey(key, namespace string) bool + MatchesKey(key, namespace string) bool // HasReadPermission uses the acl.Authorizer to determine if the items in the // Payload are visible to the request. It returns true if the payload is @@ -74,9 +73,9 @@ func (p *PayloadEvents) filter(f func(Event) bool) bool { return true } -func (p *PayloadEvents) FilterByKey(key, namespace string) bool { +func (p *PayloadEvents) MatchesKey(key, namespace string) bool { return p.filter(func(event Event) bool { - return event.Payload.FilterByKey(key, namespace) + return event.Payload.MatchesKey(key, namespace) }) } @@ -106,7 +105,7 @@ func (e Event) IsNewSnapshotToFollow() bool { type framingEvent struct{} -func (framingEvent) FilterByKey(string, string) bool { +func (framingEvent) MatchesKey(string, string) bool { return true } @@ -126,7 +125,7 @@ type closeSubscriptionPayload struct { tokensSecretIDs []string } -func (closeSubscriptionPayload) FilterByKey(string, string) bool { +func (closeSubscriptionPayload) MatchesKey(string, string) bool { return false } diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index c9f91fd3d..576d4ccc3 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -70,7 +70,7 @@ type simplePayload struct { noReadPerm bool } -func (p simplePayload) FilterByKey(key, _ string) bool { +func (p simplePayload) MatchesKey(key, _ string) bool { if key == "" { return true } diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index 5a9cccea8..e18640d65 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -35,7 +35,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) { events = append(events, tc.events...) pe := &PayloadEvents{Items: events} - ok := pe.FilterByKey(tc.req.Key, tc.req.Namespace) + ok := pe.MatchesKey(tc.req.Key, tc.req.Namespace) require.Equal(t, tc.expectEvent, ok) if !tc.expectEvent { return @@ -144,7 +144,7 @@ type nsPayload struct { value string } -func (p nsPayload) FilterByKey(key, namespace string) bool { +func (p nsPayload) MatchesKey(key, namespace string) bool { return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 5dc45efdf..795785e42 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -102,7 +102,7 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) { continue } event := newEventFromBatch(s.req, next.Events) - if !event.Payload.FilterByKey(s.req.Key, s.req.Namespace) { + if !event.Payload.MatchesKey(s.req.Key, s.req.Namespace) { continue } return event, nil From e4a78c977d9e0a573493488206914b614ee978fe Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 6 Nov 2020 13:00:33 -0500 Subject: [PATCH 5/5] stream: document that Payload must be immutable If they are sent to EventPublisher.Publish. Also document that PayloadEvents is expected to come from a subscription and that it is not immutable. --- agent/consul/state/catalog_events.go | 4 ++++ agent/consul/stream/event.go | 14 ++++++++++++-- agent/consul/stream/event_publisher.go | 3 ++- agent/consul/stream/event_test.go | 14 +++++++------- agent/consul/stream/subscription.go | 2 +- agent/consul/stream/subscription_test.go | 2 +- agent/rpc/subscribe/subscribe_test.go | 6 +++++- 7 files changed, 32 insertions(+), 13 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index e9968eb64..526eca354 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -11,6 +11,10 @@ import ( // EventPayloadCheckServiceNode is used as the Payload for a stream.Event to // indicates changes to a CheckServiceNode for service health. +// +// The stream.Payload methods implemented by EventPayloadCheckServiceNode are +// do not mutate the payload, making it safe to use in an Event sent to +// stream.EventPublisher.Publish. type EventPayloadCheckServiceNode struct { Op pbsubscribe.CatalogOp Value *structs.CheckServiceNode diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 1ee316d09..74df46b5e 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -22,6 +22,9 @@ type Event struct { Payload Payload } +// A Payload contains the topic-specific data in an event. The payload methods +// should not modify the state of the payload if the Event is being submitted to +// EventPublisher.Publish. type Payload interface { // MatchesKey must return true if the Payload should be included in a subscription // requested with the key and namespace. @@ -36,12 +39,16 @@ type Payload interface { HasReadPermission(authz acl.Authorizer) bool } -// PayloadEvents is an Payload which contains multiple Events. +// PayloadEvents is a Payload that may be returned by Subscription.Next when +// there are multiple events at an index. +// +// Note that unlike most other Payload, PayloadEvents is mutable and it is NOT +// safe to send to EventPublisher.Publish. type PayloadEvents struct { Items []Event } -func NewPayloadEvents(items ...Event) *PayloadEvents { +func newPayloadEvents(items ...Event) *PayloadEvents { return &PayloadEvents{Items: items} } @@ -73,6 +80,7 @@ func (p *PayloadEvents) filter(f func(Event) bool) bool { return true } +// MatchesKey filters the PayloadEvents to those which match the key and namespace. func (p *PayloadEvents) MatchesKey(key, namespace string) bool { return p.filter(func(event Event) bool { return event.Payload.MatchesKey(key, namespace) @@ -83,6 +91,8 @@ func (p *PayloadEvents) Len() int { return len(p.Items) } +// HasReadPermission filters the PayloadEvents to those which are authorized +// for reading by authz. func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool { return p.filter(func(event Event) bool { return event.Payload.HasReadPermission(authz) diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 379bfdfa8..769e875d8 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -91,7 +91,8 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E return e } -// Publish events to all subscribers of the event Topic. +// Publish events to all subscribers of the event Topic. The events will be shared +// with all subscriptions, so the Payload used in Event.Payload must be immutable. func (e *EventPublisher) Publish(events []Event) { if len(events) > 0 { e.publishCh <- events diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index e18640d65..8b36ee8d1 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -54,7 +54,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) { newSimpleEvent("One", 102), newSimpleEvent("Two", 102)}, expectEvent: true, - expected: NewPayloadEvents( + expected: newPayloadEvents( newSimpleEvent("One", 102), newSimpleEvent("Two", 102)), expectedCap: 5, @@ -66,7 +66,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) { newSimpleEvent("Same", 103), newSimpleEvent("Same", 103)}, expectEvent: true, - expected: NewPayloadEvents( + expected: newPayloadEvents( newSimpleEvent("Same", 103), newSimpleEvent("Same", 103)), expectedCap: 5, @@ -78,7 +78,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) { newNSEvent("Something", "apps"), newNSEvent("Other", "apps")}, expectEvent: true, - expected: NewPayloadEvents( + expected: newPayloadEvents( newNSEvent("Something", "apps"), newNSEvent("Other", "apps")), expectedCap: 5, @@ -91,7 +91,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) { newSimpleEvent("Other", 104), newSimpleEvent("Same", 104)}, expectEvent: true, - expected: NewPayloadEvents( + expected: newPayloadEvents( newSimpleEvent("Same", 104), newSimpleEvent("Same", 104)), expectedCap: 2, @@ -104,7 +104,7 @@ func TestPayloadEvents_FilterByKey(t *testing.T) { newNSEvent("db1", "dbs"), newNSEvent("app2", "apps")}, expectEvent: true, - expected: NewPayloadEvents( + expected: newPayloadEvents( newNSEvent("app1", "apps"), newNSEvent("app2", "apps")), expectedCap: 2, @@ -150,7 +150,7 @@ func (p nsPayload) MatchesKey(key, namespace string) bool { func TestPayloadEvents_HasReadPermission(t *testing.T) { t.Run("some events filtered", func(t *testing.T) { - ep := NewPayloadEvents( + ep := newPayloadEvents( Event{Payload: simplePayload{key: "one", noReadPerm: true}}, Event{Payload: simplePayload{key: "two", noReadPerm: false}}, Event{Payload: simplePayload{key: "three", noReadPerm: true}}, @@ -165,7 +165,7 @@ func TestPayloadEvents_HasReadPermission(t *testing.T) { }) t.Run("all events filtered", func(t *testing.T) { - ep := NewPayloadEvents( + ep := newPayloadEvents( Event{Payload: simplePayload{key: "one", noReadPerm: true}}, Event{Payload: simplePayload{key: "two", noReadPerm: true}}, Event{Payload: simplePayload{key: "three", noReadPerm: true}}, diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 795785e42..03069ea93 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -128,7 +128,7 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event { return Event{ Topic: req.Topic, Index: first.Index, - Payload: NewPayloadEvents(events...), + Payload: newPayloadEvents(events...), } } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index bec2538cb..02368f61d 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -159,7 +159,7 @@ func TestNewEventsFromBatch(t *testing.T) { expected := Event{ Topic: testTopic, Index: 9999, - Payload: NewPayloadEvents(events...), + Payload: newPayloadEvents(events...), } require.Equal(t, expected, e) }) diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index fe3a07325..cf37e75bb 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -917,7 +917,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { name: "event batch", event: stream.Event{ Index: 2002, - Payload: stream.NewPayloadEvents( + Payload: newPayloadEvents( stream.Event{ Index: 2002, Payload: state.EventPayloadCheckServiceNode{ @@ -1007,6 +1007,10 @@ func TestNewEventFromSteamEvent(t *testing.T) { } } +func newPayloadEvents(items ...stream.Event) *stream.PayloadEvents { + return &stream.PayloadEvents{Items: items} +} + // newEventFromSubscription is used to return framing events. EndOfSnapshot and // NewSnapshotToFollow are not exported, but we can get them from a subscription. func newEventFromSubscription(t *testing.T, index uint64) stream.Event {