Merge pull request #9061 from hashicorp/dnephin/event-fields

stream: support filtering by namespace
This commit is contained in:
Daniel Nephin 2020-11-05 14:18:35 -05:00 committed by GitHub
commit cd220e5d6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 322 additions and 341 deletions

View File

@ -9,17 +9,15 @@ import (
"github.com/hashicorp/consul/types"
)
func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event {
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Topic: topic,
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic) *pbsubscribe.Event {
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Topic: topic,
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
@ -37,8 +35,6 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -117,8 +113,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -164,7 +158,6 @@ func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Topic: first.Topic,
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},

View File

@ -26,7 +26,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1))
client.QueueEvents(newEndOfSnapshotEvent(1))
opts := cache.FetchOptions{
MinIndex: 0,
@ -230,7 +230,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
registerServiceWeb(5, 1),
registerServiceWeb(5, 2),
registerServiceWeb(5, 3),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
@ -301,7 +301,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4),
registerServiceWeb(50, 5),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50))
newEndOfSnapshotEvent(50))
// Make another blocking query with THE SAME index. It should immediately
// return the new snapshot.
@ -324,11 +324,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
client.QueueErr(tempError("temporary connection error"))
client.QueueEvents(
newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth),
newNewSnapshotToFollowEvent(),
registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4),
registerServiceWeb(50, 5),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50))
newEndOfSnapshotEvent(50))
start := time.Now()
opts.MinIndex = 49
@ -358,7 +358,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents(
batchEv,
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
@ -428,7 +428,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents(
batchEv,
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{

View File

@ -13,6 +13,26 @@ import (
type EventPayloadCheckServiceNode struct {
Op pbsubscribe.CatalogOp
Value *structs.CheckServiceNode
// key is used to override the key used to filter the payload. It is set for
// events in the connect topic to specify the name of the underlying service
// when the change event is for a sidecar or gateway.
key string
}
func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool {
if key == "" && namespace == "" {
return true
}
if e.Value.Service == nil {
return false
}
name := e.Value.Service.Service
if e.key != "" {
name = e.key
}
return key == name && namespace == e.Value.Service.EnterpriseMeta.GetNamespace()
}
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
@ -42,10 +62,6 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
},
}
if n.Service != nil {
event.Key = n.Service.Service
}
// append each event as a separate item so that they can be serialized
// separately, to prevent the encoding of one massive message.
buf.Append([]stream.Event{event})
@ -252,7 +268,9 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S
e := newServiceHealthEventDeregister(idx, before)
e.Topic = topicServiceHealthConnect
e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = payload.Value.Service.Proxy.DestinationServiceName
e.Payload = payload
return e, true
}
@ -304,7 +322,9 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
result = append(result, connectEvent)
case node.Service.Kind == structs.ServiceKindConnectProxy:
connectEvent.Key = node.Service.Proxy.DestinationServiceName
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.key = node.Service.Proxy.DestinationServiceName
connectEvent.Payload = payload
result = append(result, connectEvent)
default:
@ -316,7 +336,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
return result
}
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode {
ep, ok := payload.(EventPayloadCheckServiceNode)
if !ok {
return nil
@ -431,7 +451,6 @@ func newServiceHealthEventRegister(
}
return stream.Event{
Topic: topicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -458,7 +477,6 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream
return stream.Event{
Topic: topicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,

View File

@ -4,12 +4,15 @@ import (
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
)
func TestServiceHealthEventsFromChanges(t *testing.T) {
@ -819,6 +822,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
return nil
},
WantEvents: []stream.Event{
// We should see:
// - service dereg for web and proxy on node2
@ -829,29 +833,15 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
// - connect reg for api on node2
testServiceHealthDeregistrationEvent(t, "web", evNode2),
testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar),
testServiceHealthDeregistrationEvent(t, "web",
evConnectTopic,
evNode2,
evSidecar,
),
testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar),
testServiceHealthEvent(t, "web", evNodeUnchanged),
testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged),
testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged),
testServiceHealthEvent(t, "api",
evNode2,
evConnectNative,
evNodeUnchanged,
),
testServiceHealthEvent(t, "api",
evNode2,
evConnectTopic,
evConnectNative,
evNodeUnchanged,
),
testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged),
testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged),
},
WantErr: false,
},
}
@ -884,17 +874,36 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
}
require.NoError(t, err)
// Make sure we have the right events, only taking ordering into account
// where it matters to account for non-determinism.
requireEventsInCorrectPartialOrder(t, tc.WantEvents, got, func(e stream.Event) string {
// We need events affecting unique registrations to be ordered, within a topic
csn := getPayloadCheckServiceNode(e.Payload)
return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service)
})
assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents)
})
}
}
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}
// cmpPartialOrderEvents returns a compare option which sorts events so that
// all events for a particular node/service are grouped together. The sort is
// stable so events with the same node/service retain their relative order.
var cmpPartialOrderEvents = cmp.Options{
cmpopts.SortSlices(func(i, j stream.Event) bool {
key := func(e stream.Event) string {
csn := getPayloadCheckServiceNode(e.Payload)
return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service)
}
return key(i) < key(j)
}),
cmpEvents,
}
var cmpEvents = cmp.Options{
cmp.AllowUnexported(EventPayloadCheckServiceNode{}),
}
type regOption func(req *structs.RegisterRequest) error
func testNodeRegistration(t *testing.T, opts ...regOption) *structs.RegisterRequest {
@ -1170,10 +1179,10 @@ func evSidecar(e *stream.Event) error {
csn.Checks[1].ServiceName = svc + "_sidecar_proxy"
}
// Update event key to be the proxy service name, but only if this is not
// already in the connect topic
if e.Topic != topicServiceHealthConnect {
e.Key = csn.Service.Service
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = svc
e.Payload = payload
}
return nil
}
@ -1242,15 +1251,13 @@ func evChecksUnchanged(e *stream.Event) error {
// name but not ID simulating an in-place service rename.
func evRenameService(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
isSidecar := csn.Service.Kind == structs.ServiceKindConnectProxy
if !isSidecar {
if csn.Service.Kind != structs.ServiceKindConnectProxy {
csn.Service.Service += "_changed"
// Update service checks
if len(csn.Checks) >= 2 {
csn.Checks[1].ServiceName += "_changed"
}
e.Key += "_changed"
return nil
}
// This is a sidecar, it's not really realistic but lets only update the
@ -1258,12 +1265,13 @@ func evRenameService(e *stream.Event) error {
// we get the right result. This is certainly possible if not likely so a
// valid case.
// We don't need to update out own details, only the name of the destination
// We don't need to update our own details, only the name of the destination
csn.Service.Proxy.DestinationServiceName += "_changed"
// If this is the connect topic we need to change the key too
if e.Topic == topicServiceHealthConnect {
e.Key += "_changed"
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = csn.Service.Proxy.DestinationServiceName
e.Payload = payload
}
return nil
}
@ -1337,48 +1345,6 @@ func evServiceCheckDelete(e *stream.Event) error {
return nil
}
// requireEventsInCorrectPartialOrder compares that the expected set of events
// was emitted. It allows for _independent_ events to be emitted in any order -
// this can be important because even though the transaction processing is all
// strictly ordered up until the processing func, grouping multiple updates that
// affect the same logical entity may be necessary and may impose random
// ordering changes on the eventual events if a map is used. We only care that
// events _affecting the same topic and key_ are ordered correctly with respect
// to the "expected" set of events so this helper asserts that.
//
// The caller provides a func that can return a partition key for the given
// event types and we assert that all events with the same partition key are
// deliveries in the same order. Note that this is not necessarily the same as
// topic/key since for example in Catalog only events about a specific service
// _instance_ need to be ordered while topic and key are more general.
func requireEventsInCorrectPartialOrder(t *testing.T, want, got []stream.Event,
partKey func(stream.Event) string) {
t.Helper()
// Partion both arrays by topic/key
wantParts := make(map[string][]stream.Event)
gotParts := make(map[string][]stream.Event)
for _, e := range want {
k := partKey(e)
wantParts[k] = append(wantParts[k], e)
}
for _, e := range got {
k := partKey(e)
gotParts[k] = append(gotParts[k], e)
}
for k, want := range wantParts {
require.Equal(t, want, gotParts[k], "got incorrect events for partition: %s", k)
}
for k, got := range gotParts {
if _, ok := wantParts[k]; !ok {
require.Equal(t, nil, got, "got unwanted events for partition: %s", k)
}
}
}
// newTestEventServiceHealthRegister returns a realistically populated service
// health registration event. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
@ -1393,7 +1359,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
return stream.Event{
Topic: topicServiceHealth,
Key: svc,
Index: index,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -1464,7 +1429,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event {
return stream.Event{
Topic: topicServiceHealth,
Key: svc,
Index: index,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,

View File

@ -395,9 +395,8 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
Payload: nodePayload{node: node, key: req.Key},
}
snap.Append([]stream.Event{event})
}
@ -406,6 +405,15 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
}
}
type nodePayload struct {
key string
node *structs.ServiceNode
}
func (p nodePayload) FilterByKey(key, _ string) bool {
return p.key == key
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",

View File

@ -14,15 +14,23 @@ type Topic fmt.Stringer
// EventPublisher and returned to Subscribers.
type Event struct {
Topic Topic
Key string
Index uint64
Payload interface{}
Payload Payload
}
type Payload interface {
// FilterByKey must return true if the Payload should be included in a subscription
// requested with the key and namespace.
// Generally this means that the payload matches the key and namespace or
// the payload is a special framing event that should be returned to every
// subscription.
FilterByKey(key, namespace string) bool
}
// Len returns the number of events contained within this event. If the Payload
// is a []Event, the length of that slice is returned. Otherwise 1 is returned.
func (e Event) Len() int {
if batch, ok := e.Payload.([]Event); ok {
if batch, ok := e.Payload.(PayloadEvents); ok {
return len(batch)
}
return 1
@ -31,7 +39,7 @@ func (e Event) Len() int {
// Filter returns an Event filtered to only those Events where f returns true.
// If the second return value is false, every Event was removed by the filter.
func (e Event) Filter(f func(Event) bool) (Event, bool) {
batch, ok := e.Payload.([]Event)
batch, ok := e.Payload.(PayloadEvents)
if !ok {
return e, f(e)
}
@ -50,7 +58,7 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) {
return e, size != 0
}
filtered := make([]Event, 0, size)
filtered := make(PayloadEvents, 0, size)
for idx := range batch {
event := batch[idx]
if f(event) {
@ -64,6 +72,20 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) {
return e, true
}
// PayloadEvents is an Payload which contains multiple Events.
type PayloadEvents []Event
// TODO: this method is not called, but needs to exist so that we can store
// a slice of events as a payload. In the future we should be able to refactor
// Event.Filter so that this FilterByKey includes the re-slicing.
func (e PayloadEvents) FilterByKey(_, _ string) bool {
return true
}
func (e PayloadEvents) Events() []Event {
return e
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Subsequent events from Subscription.Next will be
// streamed as they occur.
@ -80,12 +102,24 @@ func (e Event) IsNewSnapshotToFollow() bool {
type endOfSnapshot struct{}
func (endOfSnapshot) FilterByKey(string, string) bool {
return true
}
type newSnapshotToFollow struct{}
func (newSnapshotToFollow) FilterByKey(string, string) bool {
return true
}
type closeSubscriptionPayload struct {
tokensSecretIDs []string
}
func (closeSubscriptionPayload) FilterByKey(string, string) bool {
return true
}
// NewCloseSubscriptionEvent returns a special Event that is handled by the
// stream package, and is never sent to subscribers. EventProcessor handles
// these events, and closes any subscriptions which were created using a token

View File

@ -185,7 +185,6 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
if req.Index > 0 {
snap.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Payload: newSnapshotToFollow{},
}})

View File

@ -43,24 +43,37 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
}}
publisher.Publish(events)
// Subscriber should see the published event
next = getNextEvent(t, eventCh)
expected := Event{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}
expected := Event{
Topic: testTopic,
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
}
require.Equal(t, expected, next)
}
var testSnapshotEvent = Event{
Topic: testTopic,
Payload: "snapshot-event-payload",
Key: "sub-key",
Payload: simplePayload{key: "sub-key", value: "snapshot-event-payload"},
Index: 1,
}
type simplePayload struct {
key string
value string
}
func (p simplePayload) FilterByKey(key, _ string) bool {
if key == "" {
return true
}
return p.key == key
}
func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
@ -193,8 +206,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
expected := Event{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
Index: 3,
}
publisher.Publish([]Event{expected})
@ -243,9 +255,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
expected := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
Payload: simplePayload{key: "sub-key", value: "event-3"},
}
publisher.publishEvent([]Event{expected})
@ -284,9 +295,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
Payload: simplePayload{key: "sub-key", value: "event-3"},
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
@ -341,9 +351,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
Payload: simplePayload{key: "sub-key", value: "event-3"},
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {

View File

@ -37,7 +37,6 @@ func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, t
}
s.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Index: idx,
Payload: endOfSnapshot{},
}})

View File

@ -129,9 +129,9 @@ func TestEventSnapshot(t *testing.T) {
e := curItem.Events[0]
switch {
case snapDone:
payload, ok := e.Payload.(string)
payload, ok := e.Payload.(simplePayload)
require.True(t, ok, "want health event got: %#v", e.Payload)
updateIDs = append(updateIDs, payload)
updateIDs = append(updateIDs, payload.value)
if len(updateIDs) == tc.updatesAfterSnap {
// We're done!
break RECV
@ -139,9 +139,9 @@ func TestEventSnapshot(t *testing.T) {
case e.IsEndOfSnapshot():
snapDone = true
default:
payload, ok := e.Payload.(string)
payload, ok := e.Payload.(simplePayload)
require.True(t, ok, "want health event got: %#v", e.Payload)
snapIDs = append(snapIDs, payload)
snapIDs = append(snapIDs, payload.value)
}
}
@ -176,6 +176,6 @@ func newDefaultHealthEvent(index uint64, n int) Event {
return Event{
Index: index,
Topic: testTopic,
Payload: fmt.Sprintf("test-event-%03d", n),
Payload: simplePayload{value: fmt.Sprintf("test-event-%03d", n)},
}
}

View File

@ -53,9 +53,21 @@ type Subscription struct {
// SubscribeRequest identifies the types of events the subscriber would like to
// receiver. Topic and Token are required.
type SubscribeRequest struct {
// Topic to subscribe to
Topic Topic
Key string
// Key used to filter events in the topic. Only events matching the key will
// be returned by the subscription. A blank key will return all events. Key
// is generally the name of the resource.
Key string
// Namespace used to filter events in the topic. Only events matching the
// namespace will be returned by the subscription.
Namespace string
// Token that was used to authenticate the request. If any ACL policy
// changes impact the token the subscription will be forcefully closed.
Token string
// Index is the last index the client received. If non-zero the
// subscription will be resumed from this index. If the index is out-of-date
// a NewSnapshotToFollow event will be sent.
Index uint64
}
@ -115,9 +127,8 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event {
}
return Event{
Topic: req.Topic,
Key: req.Key,
Index: first.Index,
Payload: events,
Payload: PayloadEvents(events),
}
}
@ -128,7 +139,7 @@ func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
}
fn := func(e Event) bool {
return req.Key == e.Key
return e.Payload.FilterByKey(req.Key, req.Namespace)
}
return event.Filter(fn)
}

View File

@ -69,7 +69,7 @@ func TestSubscription(t *testing.T) {
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Equal(t, index, got.Index)
require.Equal(t, "test", got.Key)
require.Equal(t, "test", got.Payload.(simplePayload).key)
// Cancelling the subscription context should unblock Next
start = time.Now()
@ -130,20 +130,17 @@ func TestSubscription_Close(t *testing.T) {
}
func publishTestEvent(index uint64, b *eventBuffer, key string) {
// Don't care about the event payload for now just the semantics of publishing
// something. This is not a valid stream in the end-to-end streaming protocol
// but enough to test subscription mechanics.
e := Event{
Index: index,
Topic: testTopic,
Key: key,
Index: index,
Topic: testTopic,
Payload: simplePayload{key: key},
}
b.Append([]Event{e})
}
func TestFilter_NoKey(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "One", Index: 102}, Event{Key: "Two"})
events := make(PayloadEvents, 0, 5)
events = append(events, newSimpleEvent("One", 102), newSimpleEvent("Two", 102))
req := SubscribeRequest{Topic: testTopic}
actual, ok := filterByKey(req, events)
@ -151,26 +148,33 @@ func TestFilter_NoKey(t *testing.T) {
require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual)
// test that a new array was not allocated
require.Equal(t, cap(actual.Payload.([]Event)), 5)
require.Equal(t, cap(actual.Payload.(PayloadEvents)), 5)
}
func newSimpleEvent(key string, index uint64) Event {
return Event{Index: index, Payload: simplePayload{key: key}}
}
func TestFilter_WithKey_AllEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same", Index: 103}, Event{Key: "Same"})
events := make(PayloadEvents, 0, 5)
events = append(events, newSimpleEvent("Same", 103), newSimpleEvent("Same", 103))
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
actual, ok := filterByKey(req, events)
require.True(t, ok)
expected := Event{Topic: testTopic, Index: 103, Key: "Same", Payload: events}
expected := Event{Topic: testTopic, Index: 103, Payload: events}
require.Equal(t, expected, actual)
// test that a new array was not allocated
require.Equal(t, 5, cap(actual.Payload.([]Event)))
require.Equal(t, 5, cap(actual.Payload.(PayloadEvents)))
}
func TestFilter_WithKey_SomeEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same", Index: 104}, Event{Key: "Other"}, Event{Key: "Same"})
events = append(events,
newSimpleEvent("Same", 104),
newSimpleEvent("Other", 0),
newSimpleEvent("Same", 0))
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
actual, ok := filterByKey(req, events)
@ -178,18 +182,17 @@ func TestFilter_WithKey_SomeEventsMatch(t *testing.T) {
expected := Event{
Topic: testTopic,
Index: 104,
Key: "Same",
Payload: []Event{{Key: "Same", Index: 104}, {Key: "Same"}},
Payload: PayloadEvents{newSimpleEvent("Same", 104), newSimpleEvent("Same", 0)},
}
require.Equal(t, expected, actual)
// test that a new array was allocated with the correct size
require.Equal(t, cap(actual.Payload.([]Event)), 2)
require.Equal(t, cap(actual.Payload.(PayloadEvents)), 2)
}
func TestFilter_WithKey_NoEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Same"})
events = append(events, newSimpleEvent("Same", 0), newSimpleEvent("Same", 0))
req := SubscribeRequest{Topic: testTopic, Key: "Other"}
_, ok := filterByKey(req, events)

View File

@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
}
elog.Trace(event)
e := newEventFromStreamEvent(req.Topic, event)
e := newEventFromStreamEvent(event)
if err := serverStream.Send(e); err != nil {
return err
}
@ -139,12 +139,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool)
return event.Filter(fn)
}
func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event {
e := &pbsubscribe.Event{
Topic: topic,
Key: event.Key,
Index: event.Index,
}
func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
e := &pbsubscribe.Event{Index: event.Index}
switch {
case event.IsEndOfSnapshot():
e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}
@ -157,9 +153,9 @@ func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsub
return e
}
func setPayload(e *pbsubscribe.Event, payload interface{}) {
func setPayload(e *pbsubscribe.Event, payload stream.Payload) {
switch p := payload.(type) {
case []stream.Event:
case stream.PayloadEvents:
e.Payload = &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: batchEventsFromEventSlice(p),
@ -182,7 +178,7 @@ func batchEventsFromEventSlice(events []stream.Event) []*pbsubscribe.Event {
result := make([]*pbsubscribe.Event, len(events))
for i := range events {
event := events[i]
result[i] = &pbsubscribe.Event{Key: event.Key, Index: event.Index}
result[i] = &pbsubscribe.Event{Index: event.Index}
setPayload(result[i], event.Payload)
}
return result

View File

@ -107,8 +107,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{
{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -139,8 +137,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
},
},
{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -171,8 +167,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
},
},
{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
},
@ -192,8 +186,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
event := getEvent(t, chEvents)
expectedEvent := &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -460,8 +452,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{
{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -492,8 +482,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
},
},
{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -524,8 +512,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
},
},
{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
},
@ -545,8 +531,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
event := getEvent(t, chEvents)
expectedEvent := &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -902,11 +886,9 @@ func TestNewEventFromSteamEvent(t *testing.T) {
expected pbsubscribe.Event
}
testTopic := pbsubscribe.Topic_ServiceHealthConnect
fn := func(t *testing.T, tc testCase) {
expected := tc.expected
expected.Topic = testTopic
actual := newEventFromStreamEvent(testTopic, tc.event)
actual := newEventFromStreamEvent(tc.event)
assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty())
}
@ -929,11 +911,9 @@ func TestNewEventFromSteamEvent(t *testing.T) {
{
name: "event batch",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: []stream.Event{
Payload: stream.PayloadEvents{
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -944,7 +924,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
@ -957,13 +936,11 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: []*pbsubscribe.Event{
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -976,7 +953,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -996,7 +972,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
{
name: "event payload CheckServiceNode",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -1007,7 +982,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{

View File

@ -74,6 +74,11 @@ func (_ *EnterpriseMeta) FillAuthzContext(_ *acl.AuthorizerContext) {}
func (_ *EnterpriseMeta) Normalize() {}
// GetNamespace always returns the empty string.
func (_ *EnterpriseMeta) GetNamespace() string {
return ""
}
// FillAuthzContext stub
func (_ *DirEntry) FillAuthzContext(_ *acl.AuthorizerContext) {}

View File

@ -108,7 +108,12 @@ type SubscribeRequest struct {
// If it's not the local DC the server will forward the request to
// the remote DC and proxy the results back to the subscriber. An empty
// string defaults to the local datacenter.
Datacenter string `protobuf:"bytes,5,opt,name=Datacenter,proto3" json:"Datacenter,omitempty"`
Datacenter string `protobuf:"bytes,5,opt,name=Datacenter,proto3" json:"Datacenter,omitempty"`
// Namespace which contains the resources. If Namespace is not specified the
// default namespace will be used.
//
// Namespace is an enterprise-only feature.
Namespace string `protobuf:"bytes,6,opt,name=Namespace,proto3" json:"Namespace,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -182,20 +187,23 @@ func (m *SubscribeRequest) GetDatacenter() string {
return ""
}
func (m *SubscribeRequest) GetNamespace() string {
if m != nil {
return m.Namespace
}
return ""
}
// Event describes a streaming update on a subscription. Events are used both to
// describe the current "snapshot" of the result as well as ongoing mutations to
// that snapshot.
type Event struct {
// Topic the event was published to
Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"`
// Key is the logical identifier for the entity that was mutated.
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
// Index is the raft index at which the mutation took place. At the top
// level of a subscription there will always be at most one Event per index.
// If multiple events are published to the same topic in a single raft
// transaction then the batch of events will be encoded inside a single
// top-level event to ensure they are delivered atomically to clients.
Index uint64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"`
Index uint64 `protobuf:"varint,1,opt,name=Index,proto3" json:"Index,omitempty"`
// Payload is the actual event content.
//
// Types that are valid to be assigned to Payload:
@ -249,13 +257,13 @@ type isEvent_Payload interface {
}
type Event_EndOfSnapshot struct {
EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"`
EndOfSnapshot bool `protobuf:"varint,2,opt,name=EndOfSnapshot,proto3,oneof"`
}
type Event_NewSnapshotToFollow struct {
NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"`
NewSnapshotToFollow bool `protobuf:"varint,3,opt,name=NewSnapshotToFollow,proto3,oneof"`
}
type Event_EventBatch struct {
EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"`
EventBatch *EventBatch `protobuf:"bytes,4,opt,name=EventBatch,proto3,oneof"`
}
type Event_ServiceHealth struct {
ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"`
@ -273,20 +281,6 @@ func (m *Event) GetPayload() isEvent_Payload {
return nil
}
func (m *Event) GetTopic() Topic {
if m != nil {
return m.Topic
}
return Topic_Unknown
}
func (m *Event) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Event) GetIndex() uint64 {
if m != nil {
return m.Index
@ -341,17 +335,17 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
if x.EndOfSnapshot {
t = 1
}
_ = b.EncodeVarint(5<<3 | proto.WireVarint)
_ = b.EncodeVarint(2<<3 | proto.WireVarint)
_ = b.EncodeVarint(t)
case *Event_NewSnapshotToFollow:
t := uint64(0)
if x.NewSnapshotToFollow {
t = 1
}
_ = b.EncodeVarint(6<<3 | proto.WireVarint)
_ = b.EncodeVarint(3<<3 | proto.WireVarint)
_ = b.EncodeVarint(t)
case *Event_EventBatch:
_ = b.EncodeVarint(7<<3 | proto.WireBytes)
_ = b.EncodeVarint(4<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.EventBatch); err != nil {
return err
}
@ -370,21 +364,21 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*Event)
switch tag {
case 5: // Payload.EndOfSnapshot
case 2: // Payload.EndOfSnapshot
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.Payload = &Event_EndOfSnapshot{x != 0}
return true, err
case 6: // Payload.NewSnapshotToFollow
case 3: // Payload.NewSnapshotToFollow
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.Payload = &Event_NewSnapshotToFollow{x != 0}
return true, err
case 7: // Payload.EventBatch
case 4: // Payload.EventBatch
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
@ -546,40 +540,41 @@ func init() {
func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) }
var fileDescriptor_ab3eb8c810e315fb = []byte{
// 526 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x5f, 0x6f, 0xd2, 0x50,
0x14, 0xef, 0x85, 0x01, 0xe3, 0xe0, 0x96, 0x7a, 0x87, 0xb1, 0x61, 0x49, 0x83, 0xc4, 0x2c, 0x75,
0x89, 0xd4, 0x60, 0xa2, 0x6f, 0x1a, 0x61, 0x9b, 0x18, 0x93, 0x61, 0xca, 0xf6, 0xa0, 0x6f, 0x97,
0xf6, 0x48, 0x1b, 0xea, 0xbd, 0xb5, 0xbd, 0x0c, 0xf7, 0xee, 0x87, 0xd8, 0xb7, 0xf1, 0xd5, 0x47,
0x3f, 0x82, 0xc1, 0x2f, 0x62, 0xb8, 0x94, 0xae, 0xc0, 0xde, 0xf6, 0xd6, 0xf3, 0xfb, 0x73, 0xcf,
0x2f, 0xe7, 0xf4, 0xc0, 0x93, 0x28, 0x16, 0x52, 0xd8, 0xd1, 0x28, 0x99, 0x8e, 0x12, 0x37, 0x0e,
0x46, 0x68, 0x67, 0x5f, 0x6d, 0xc5, 0xd1, 0x6a, 0x06, 0x34, 0x1a, 0x99, 0x1a, 0xe3, 0xab, 0xc0,
0x45, 0x9b, 0x0b, 0x2f, 0x95, 0xb5, 0x6e, 0x08, 0xe8, 0xc3, 0x95, 0xd2, 0xc1, 0xef, 0x53, 0x4c,
0x24, 0x3d, 0x82, 0xd2, 0x85, 0x88, 0x02, 0xd7, 0x20, 0x4d, 0x62, 0xed, 0x77, 0xf4, 0xf6, 0xed,
0xe3, 0x0a, 0x77, 0x96, 0x34, 0xd5, 0xa1, 0xf8, 0x11, 0xaf, 0x8d, 0x42, 0x93, 0x58, 0x55, 0x67,
0xf1, 0x49, 0xeb, 0x0b, 0xe7, 0x04, 0xb9, 0x51, 0x54, 0xd8, 0xb2, 0x58, 0xa0, 0x1f, 0xb8, 0x87,
0x3f, 0x8c, 0x9d, 0x26, 0xb1, 0x76, 0x9c, 0x65, 0x41, 0x4d, 0x80, 0x13, 0x26, 0x99, 0x8b, 0x5c,
0x62, 0x6c, 0x94, 0x94, 0x21, 0x87, 0xb4, 0x7e, 0x15, 0xa0, 0x74, 0x7a, 0x85, 0xfc, 0x9e, 0x79,
0x96, 0x9d, 0x8b, 0xf9, 0xce, 0x47, 0xb0, 0x77, 0xca, 0xbd, 0xc1, 0xd7, 0x21, 0x67, 0x51, 0xe2,
0x0b, 0xa9, 0x9a, 0xef, 0xf6, 0x35, 0x67, 0x1d, 0xa6, 0x1d, 0x38, 0x38, 0xc7, 0xd9, 0xaa, 0xbc,
0x10, 0x67, 0x22, 0x0c, 0xc5, 0xcc, 0x28, 0xa7, 0xea, 0xbb, 0x48, 0xfa, 0x1a, 0x40, 0x85, 0xee,
0x32, 0xe9, 0xfa, 0x46, 0xa5, 0x49, 0xac, 0x5a, 0xe7, 0x51, 0x2e, 0xf0, 0x2d, 0xd9, 0xd7, 0x9c,
0x9c, 0x94, 0x9e, 0xc1, 0xde, 0x70, 0xb9, 0x9f, 0x3e, 0xb2, 0x50, 0xfa, 0x06, 0x28, 0xaf, 0x99,
0xf3, 0xae, 0xf1, 0x97, 0x91, 0xc7, 0x24, 0x2e, 0x42, 0xaf, 0xc1, 0xdd, 0x2a, 0x54, 0x3e, 0xb1,
0xeb, 0x50, 0x30, 0xaf, 0xf5, 0x2a, 0x9f, 0x85, 0x5a, 0x50, 0x56, 0x55, 0x62, 0x90, 0x66, 0xd1,
0xaa, 0xad, 0x8d, 0x51, 0x11, 0x4e, 0xca, 0xb7, 0x7e, 0x12, 0x38, 0xb8, 0xa3, 0x17, 0x7d, 0x0a,
0x85, 0x41, 0x94, 0x2e, 0xa1, 0x9e, 0x73, 0xf7, 0x98, 0x64, 0xa1, 0x18, 0x0f, 0x22, 0xa7, 0x30,
0x88, 0xe8, 0x7b, 0xd0, 0x7b, 0x3e, 0xba, 0x93, 0xf4, 0x85, 0x73, 0xe1, 0xa1, 0x5a, 0x49, 0xad,
0x73, 0xd8, 0xce, 0xfe, 0xc1, 0xf6, 0xa6, 0xc4, 0xd9, 0x32, 0x1d, 0xbf, 0x4b, 0xd7, 0x4e, 0x6b,
0x50, 0xb9, 0xe4, 0x13, 0x2e, 0x66, 0x5c, 0xd7, 0xe8, 0xc3, 0x8d, 0x39, 0xe9, 0x84, 0x1a, 0x50,
0x5f, 0x83, 0x7a, 0x82, 0x73, 0x74, 0xa5, 0x5e, 0x38, 0x7e, 0x06, 0xd5, 0x2c, 0x1c, 0x7d, 0x00,
0xbb, 0x0e, 0x8e, 0x83, 0x44, 0x62, 0xac, 0x6b, 0x74, 0x1f, 0xe0, 0x04, 0xe3, 0x55, 0x4d, 0x3a,
0x9f, 0xe1, 0xf1, 0x50, 0x32, 0x89, 0x3d, 0x9f, 0xf1, 0x31, 0xa6, 0x37, 0x11, 0xc9, 0x40, 0x70,
0xfa, 0x06, 0xaa, 0xd9, 0x8d, 0xd0, 0xc3, 0xfc, 0x42, 0x36, 0x2e, 0xa7, 0xb1, 0x35, 0xd3, 0x96,
0xf6, 0x82, 0x74, 0xdf, 0xfe, 0x9e, 0x9b, 0xe4, 0xcf, 0xdc, 0x24, 0x7f, 0xe7, 0x26, 0xb9, 0xf9,
0x67, 0x6a, 0x5f, 0x9e, 0x8f, 0x03, 0xe9, 0x4f, 0x47, 0x6d, 0x57, 0x7c, 0xb3, 0x7d, 0x96, 0xf8,
0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, 0x4f, 0xa6, 0xa1, 0xbd, 0x75, 0xdc, 0xa3, 0xb2, 0x82, 0x5e,
0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x44, 0xbc, 0x0a, 0xfb, 0xf8, 0x03, 0x00, 0x00,
// 536 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x5f, 0x6f, 0xd2, 0x50,
0x14, 0xef, 0x65, 0x83, 0xad, 0x07, 0xb7, 0xd4, 0x3b, 0x8c, 0x0d, 0x33, 0x0d, 0x12, 0xb3, 0xe0,
0x12, 0xa9, 0xc1, 0x44, 0xdf, 0x34, 0xc2, 0x36, 0x31, 0x26, 0x60, 0xca, 0xf6, 0xa0, 0x6f, 0x97,
0xf6, 0x48, 0x1b, 0xba, 0x7b, 0x6b, 0x7b, 0x19, 0xee, 0x5d, 0xbf, 0x83, 0x9f, 0xc4, 0xcf, 0xe0,
0xa3, 0x1f, 0xc1, 0xe0, 0x17, 0x31, 0x5c, 0x4a, 0x29, 0xb0, 0xb7, 0x9e, 0xdf, 0x9f, 0x73, 0x4f,
0xcf, 0x1f, 0x78, 0x1c, 0xc5, 0x42, 0x0a, 0x3b, 0x1a, 0x26, 0x93, 0x61, 0xe2, 0xc6, 0xc1, 0x10,
0xed, 0xec, 0xab, 0xa9, 0x38, 0xaa, 0x67, 0x40, 0xb5, 0x9a, 0xa9, 0x31, 0xbe, 0x09, 0x5c, 0xb4,
0xb9, 0xf0, 0x52, 0x59, 0xfd, 0x17, 0x01, 0x63, 0xb0, 0x54, 0x3a, 0xf8, 0x75, 0x82, 0x89, 0xa4,
0x27, 0x50, 0xbc, 0x14, 0x51, 0xe0, 0x9a, 0xa4, 0x46, 0x1a, 0x87, 0x2d, 0xa3, 0xb9, 0x4a, 0xae,
0x70, 0x67, 0x41, 0x53, 0x03, 0x76, 0x3e, 0xe0, 0xad, 0x59, 0xa8, 0x91, 0x86, 0xee, 0xcc, 0x3f,
0x69, 0x65, 0xee, 0x1c, 0x23, 0x37, 0x77, 0x14, 0xb6, 0x08, 0xe6, 0xe8, 0x7b, 0xee, 0xe1, 0x37,
0x73, 0xb7, 0x46, 0x1a, 0xbb, 0xce, 0x22, 0xa0, 0x16, 0xc0, 0x19, 0x93, 0xcc, 0x45, 0x2e, 0x31,
0x36, 0x8b, 0xca, 0x90, 0x43, 0xe8, 0x23, 0xd0, 0x7b, 0xec, 0x1a, 0x93, 0x88, 0xb9, 0x68, 0x96,
0x14, 0xbd, 0x02, 0xea, 0x3f, 0x0a, 0x50, 0x3c, 0xbf, 0x41, 0x2e, 0x57, 0xd9, 0x49, 0x3e, 0xfb,
0x09, 0x1c, 0x9c, 0x73, 0xaf, 0xff, 0x65, 0xc0, 0x59, 0x94, 0xf8, 0x42, 0xaa, 0x2a, 0xf7, 0xbb,
0x9a, 0xb3, 0x0e, 0xd3, 0x16, 0x1c, 0xf5, 0x70, 0xba, 0x0c, 0x2f, 0xc5, 0x85, 0x08, 0x43, 0x31,
0x55, 0xf5, 0xcf, 0xd5, 0x77, 0x91, 0xf4, 0x15, 0x80, 0x7a, 0xba, 0xcd, 0xa4, 0xeb, 0xab, 0x9f,
0x2a, 0xb7, 0x1e, 0xe4, 0x9a, 0xb4, 0x22, 0xbb, 0x9a, 0x93, 0x93, 0xd2, 0x0b, 0x38, 0x18, 0x2c,
0x66, 0xd0, 0x45, 0x16, 0x4a, 0xdf, 0x04, 0xe5, 0xb5, 0x72, 0xde, 0x35, 0xfe, 0x2a, 0xf2, 0x98,
0xc4, 0x79, 0xd1, 0x6b, 0x70, 0x5b, 0x87, 0xbd, 0x8f, 0xec, 0x36, 0x14, 0xcc, 0xab, 0xbf, 0xcc,
0xd7, 0x42, 0x1b, 0x50, 0x52, 0x51, 0x62, 0x92, 0xda, 0x4e, 0xa3, 0xbc, 0x36, 0x3a, 0x45, 0x38,
0x29, 0x5f, 0xff, 0x4e, 0xe0, 0xe8, 0x8e, 0xb7, 0xe8, 0x13, 0x28, 0xf4, 0xa3, 0x74, 0xf0, 0x95,
0x9c, 0xbb, 0xc3, 0x24, 0x0b, 0xc5, 0xa8, 0x1f, 0x39, 0x85, 0x7e, 0x44, 0xdf, 0x81, 0xd1, 0xf1,
0xd1, 0x1d, 0xa7, 0x19, 0x7a, 0xc2, 0x43, 0xd5, 0xe0, 0x72, 0xeb, 0xb8, 0x99, 0xed, 0x59, 0x73,
0x53, 0xe2, 0x6c, 0x99, 0x4e, 0xdf, 0xa6, 0xab, 0x46, 0xcb, 0xb0, 0x77, 0xc5, 0xc7, 0x5c, 0x4c,
0xb9, 0xa1, 0xd1, 0xfb, 0x1b, 0x7d, 0x32, 0x08, 0x35, 0xa1, 0xb2, 0x06, 0x75, 0x04, 0xe7, 0xe8,
0x4a, 0xa3, 0x70, 0xfa, 0x14, 0xf4, 0xac, 0x38, 0x7a, 0x0f, 0xf6, 0x1d, 0x1c, 0x05, 0x89, 0xc4,
0xd8, 0xd0, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xcb, 0x98, 0xb4, 0x3e, 0xc1, 0xc3, 0x81, 0x64, 0x12,
0x3b, 0x3e, 0xe3, 0x23, 0x4c, 0xf7, 0x3e, 0x92, 0x81, 0xe0, 0xf4, 0x35, 0xe8, 0xd9, 0x1d, 0xd0,
0xe3, 0xfc, 0x40, 0x36, 0xae, 0xa3, 0xba, 0xd5, 0xd3, 0xba, 0xf6, 0x9c, 0xb4, 0xdf, 0xfc, 0x9e,
0x59, 0xe4, 0xcf, 0xcc, 0x22, 0x7f, 0x67, 0x16, 0xf9, 0xf9, 0xcf, 0xd2, 0x3e, 0x3f, 0x1b, 0x05,
0xd2, 0x9f, 0x0c, 0x9b, 0xae, 0xb8, 0xb6, 0x7d, 0x96, 0xf8, 0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05,
0x4f, 0x26, 0xa1, 0xbd, 0x75, 0xc0, 0xc3, 0x92, 0x82, 0x5e, 0xfc, 0x0f, 0x00, 0x00, 0xff, 0xff,
0x8f, 0x56, 0x73, 0x78, 0xdc, 0x03, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -751,6 +746,13 @@ func (m *SubscribeRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Namespace) > 0 {
i -= len(m.Namespace)
copy(dAtA[i:], m.Namespace)
i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Namespace)))
i--
dAtA[i] = 0x32
}
if len(m.Datacenter) > 0 {
i -= len(m.Datacenter)
copy(dAtA[i:], m.Datacenter)
@ -821,18 +823,6 @@ func (m *Event) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if m.Index != 0 {
i = encodeVarintSubscribe(dAtA, i, uint64(m.Index))
i--
dAtA[i] = 0x18
}
if len(m.Key) > 0 {
i -= len(m.Key)
copy(dAtA[i:], m.Key)
i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Key)))
i--
dAtA[i] = 0x12
}
if m.Topic != 0 {
i = encodeVarintSubscribe(dAtA, i, uint64(m.Topic))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
@ -851,7 +841,7 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0
}
i--
dAtA[i] = 0x28
dAtA[i] = 0x10
return len(dAtA) - i, nil
}
func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) {
@ -867,7 +857,7 @@ func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, erro
dAtA[i] = 0
}
i--
dAtA[i] = 0x30
dAtA[i] = 0x18
return len(dAtA) - i, nil
}
func (m *Event_EventBatch) MarshalTo(dAtA []byte) (int, error) {
@ -886,7 +876,7 @@ func (m *Event_EventBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i = encodeVarintSubscribe(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x3a
dAtA[i] = 0x22
}
return len(dAtA) - i, nil
}
@ -1030,6 +1020,10 @@ func (m *SubscribeRequest) Size() (n int) {
if l > 0 {
n += 1 + l + sovSubscribe(uint64(l))
}
l = len(m.Namespace)
if l > 0 {
n += 1 + l + sovSubscribe(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -1042,13 +1036,6 @@ func (m *Event) Size() (n int) {
}
var l int
_ = l
if m.Topic != 0 {
n += 1 + sovSubscribe(uint64(m.Topic))
}
l = len(m.Key)
if l > 0 {
n += 1 + l + sovSubscribe(uint64(l))
}
if m.Index != 0 {
n += 1 + sovSubscribe(uint64(m.Index))
}
@ -1309,6 +1296,38 @@ func (m *SubscribeRequest) Unmarshal(dAtA []byte) error {
}
m.Datacenter = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSubscribe
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSubscribe
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSubscribe
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Namespace = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSubscribe(dAtA[iNdEx:])
@ -1364,57 +1383,6 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
}
m.Topic = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSubscribe
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Topic |= Topic(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSubscribe
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSubscribe
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSubscribe
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
}
@ -1433,7 +1401,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
break
}
}
case 5:
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndOfSnapshot", wireType)
}
@ -1454,7 +1422,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
b := bool(v != 0)
m.Payload = &Event_EndOfSnapshot{b}
case 6:
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType)
}
@ -1475,7 +1443,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
b := bool(v != 0)
m.Payload = &Event_NewSnapshotToFollow{b}
case 7:
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType)
}

View File

@ -73,43 +73,43 @@ message SubscribeRequest {
// the remote DC and proxy the results back to the subscriber. An empty
// string defaults to the local datacenter.
string Datacenter = 5;
// Namespace which contains the resources. If Namespace is not specified the
// default namespace will be used.
//
// Namespace is an enterprise-only feature.
string Namespace = 6;
}
// Event describes a streaming update on a subscription. Events are used both to
// describe the current "snapshot" of the result as well as ongoing mutations to
// that snapshot.
message Event {
// Topic the event was published to
Topic Topic = 1;
// Key is the logical identifier for the entity that was mutated.
string Key = 2;
// Index is the raft index at which the mutation took place. At the top
// level of a subscription there will always be at most one Event per index.
// If multiple events are published to the same topic in a single raft
// transaction then the batch of events will be encoded inside a single
// top-level event to ensure they are delivered atomically to clients.
uint64 Index = 3;
uint64 Index = 1;
// Payload is the actual event content.
oneof Payload {
// EndOfSnapshot indicates the event stream for the initial snapshot has
// ended. Subsequent Events delivered will be mutations to that result.
bool EndOfSnapshot = 5;
bool EndOfSnapshot = 2;
// NewSnapshotToFollow indicates that the client view is stale. The client
// must reset its view before handing any more events. Subsequent events
// in the stream will be for a new snapshot until an EndOfSnapshot event
// is received.
bool NewSnapshotToFollow = 6;
bool NewSnapshotToFollow = 3;
// EventBatch is a set of events. This is typically used as the payload
// type where multiple events are emitted in a single topic and raft
// index (e.g. transactional updates). In this case the Topic and Index
// values of all events will match and the whole set should be delivered
// and consumed atomically.
EventBatch EventBatch = 7;
EventBatch EventBatch = 4;
// ServiceHealth is used for ServiceHealth and ServiceHealthConnect
// topics.