proto: remove Event.Namespace field

All events are part of a single Topic, so we don't need this field.
This commit is contained in:
Daniel Nephin 2020-10-27 13:43:20 -04:00
parent dbdc21c499
commit ab43236f86
6 changed files with 80 additions and 136 deletions

View File

@ -9,17 +9,15 @@ import (
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{ return &pbsubscribe.Event{
Topic: topic,
Index: index, Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
} }
} }
func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic) *pbsubscribe.Event { func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{ return &pbsubscribe.Event{
Topic: topic,
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
} }
} }
@ -37,7 +35,6 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{ return &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc, Key: svc,
Index: index, Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -117,7 +114,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs
node := fmt.Sprintf("node%d", nodeNum) node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{ return &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: svc, Key: svc,
Index: index, Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -164,7 +160,6 @@ func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event
events[i+1] = evs[i] events[i+1] = evs[i]
} }
return &pbsubscribe.Event{ return &pbsubscribe.Event{
Topic: first.Topic,
Index: first.Index, Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{ Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events}, EventBatch: &pbsubscribe.EventBatch{Events: events},

View File

@ -26,7 +26,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
// Initially there are no services registered. Server should send an // Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1. // EndOfSnapshot message immediately with index of 1.
client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1)) client.QueueEvents(newEndOfSnapshotEvent(1))
opts := cache.FetchOptions{ opts := cache.FetchOptions{
MinIndex: 0, MinIndex: 0,
@ -230,7 +230,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
registerServiceWeb(5, 1), registerServiceWeb(5, 1),
registerServiceWeb(5, 2), registerServiceWeb(5, 2),
registerServiceWeb(5, 3), registerServiceWeb(5, 3),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls. // This contains the view state so important we share it between calls.
opts := cache.FetchOptions{ opts := cache.FetchOptions{
@ -301,7 +301,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
registerServiceWeb(50, 3), // overlap existing node registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4), registerServiceWeb(50, 4),
registerServiceWeb(50, 5), registerServiceWeb(50, 5),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) newEndOfSnapshotEvent(50))
// Make another blocking query with THE SAME index. It should immediately // Make another blocking query with THE SAME index. It should immediately
// return the new snapshot. // return the new snapshot.
@ -324,11 +324,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
client.QueueErr(tempError("temporary connection error")) client.QueueErr(tempError("temporary connection error"))
client.QueueEvents( client.QueueEvents(
newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth), newNewSnapshotToFollowEvent(),
registerServiceWeb(50, 3), // overlap existing node registerServiceWeb(50, 3), // overlap existing node
registerServiceWeb(50, 4), registerServiceWeb(50, 4),
registerServiceWeb(50, 5), registerServiceWeb(50, 5),
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) newEndOfSnapshotEvent(50))
start := time.Now() start := time.Now()
opts.MinIndex = 49 opts.MinIndex = 49
@ -358,7 +358,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
newEventServiceHealthRegister(5, 3, "web")) newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents( client.QueueEvents(
batchEv, batchEv,
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls. // This contains the view state so important we share it between calls.
opts := cache.FetchOptions{ opts := cache.FetchOptions{
@ -428,7 +428,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) {
newEventServiceHealthRegister(5, 3, "web")) newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents( client.QueueEvents(
batchEv, batchEv,
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls. // This contains the view state so important we share it between calls.
opts := cache.FetchOptions{ opts := cache.FetchOptions{

View File

@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
} }
elog.Trace(event) elog.Trace(event)
e := newEventFromStreamEvent(req.Topic, event) e := newEventFromStreamEvent(event)
if err := serverStream.Send(e); err != nil { if err := serverStream.Send(e); err != nil {
return err return err
} }
@ -139,12 +139,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool)
return event.Filter(fn) return event.Filter(fn)
} }
func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event { func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
e := &pbsubscribe.Event{ e := &pbsubscribe.Event{Key: event.Key, Index: event.Index}
Topic: topic,
Key: event.Key,
Index: event.Index,
}
switch { switch {
case event.IsEndOfSnapshot(): case event.IsEndOfSnapshot():
e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true} e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}

View File

@ -107,7 +107,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
runStep(t, "receive the initial snapshot of events", func(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{ expected := []*pbsubscribe.Event{
{ {
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.For("reg3"), Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -139,7 +138,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
}, },
}, },
{ {
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.For("reg3"), Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -171,7 +169,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
}, },
}, },
{ {
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.For("reg3"), Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
@ -192,7 +189,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
event := getEvent(t, chEvents) event := getEvent(t, chEvents)
expectedEvent := &pbsubscribe.Event{ expectedEvent := &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -463,7 +459,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
runStep(t, "receive the initial snapshot of events", func(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{ expected := []*pbsubscribe.Event{
{ {
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -495,7 +490,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
}, },
}, },
{ {
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -527,7 +521,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
}, },
}, },
{ {
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.Last(),
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
@ -548,7 +541,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
event := getEvent(t, chEvents) event := getEvent(t, chEvents)
expectedEvent := &pbsubscribe.Event{ expectedEvent := &pbsubscribe.Event{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
@ -905,11 +897,9 @@ func TestNewEventFromSteamEvent(t *testing.T) {
expected pbsubscribe.Event expected pbsubscribe.Event
} }
testTopic := pbsubscribe.Topic_ServiceHealthConnect
fn := func(t *testing.T, tc testCase) { fn := func(t *testing.T, tc testCase) {
expected := tc.expected expected := tc.expected
expected.Topic = testTopic actual := newEventFromStreamEvent(tc.event)
actual := newEventFromStreamEvent(testTopic, tc.event)
assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty()) assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty())
} }

View File

@ -6,14 +6,16 @@ package pbsubscribe
import ( import (
context "context" context "context"
fmt "fmt" fmt "fmt"
proto "github.com/golang/protobuf/proto"
pbservice "github.com/hashicorp/consul/proto/pbservice"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
io "io" io "io"
math "math" math "math"
math_bits "math/bits" math_bits "math/bits"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
pbservice "github.com/hashicorp/consul/proto/pbservice"
) )
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -198,16 +200,14 @@ func (m *SubscribeRequest) GetNamespace() string {
// describe the current "snapshot" of the result as well as ongoing mutations to // describe the current "snapshot" of the result as well as ongoing mutations to
// that snapshot. // that snapshot.
type Event struct { type Event struct {
// Topic the event was published to
Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"`
// Key is the logical identifier for the entity that was mutated. // Key is the logical identifier for the entity that was mutated.
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"`
// Index is the raft index at which the mutation took place. At the top // Index is the raft index at which the mutation took place. At the top
// level of a subscription there will always be at most one Event per index. // level of a subscription there will always be at most one Event per index.
// If multiple events are published to the same topic in a single raft // If multiple events are published to the same topic in a single raft
// transaction then the batch of events will be encoded inside a single // transaction then the batch of events will be encoded inside a single
// top-level event to ensure they are delivered atomically to clients. // top-level event to ensure they are delivered atomically to clients.
Index uint64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` Index uint64 `protobuf:"varint,2,opt,name=Index,proto3" json:"Index,omitempty"`
// Payload is the actual event content. // Payload is the actual event content.
// //
// Types that are valid to be assigned to Payload: // Types that are valid to be assigned to Payload:
@ -261,13 +261,13 @@ type isEvent_Payload interface {
} }
type Event_EndOfSnapshot struct { type Event_EndOfSnapshot struct {
EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"` EndOfSnapshot bool `protobuf:"varint,3,opt,name=EndOfSnapshot,proto3,oneof"`
} }
type Event_NewSnapshotToFollow struct { type Event_NewSnapshotToFollow struct {
NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"` NewSnapshotToFollow bool `protobuf:"varint,4,opt,name=NewSnapshotToFollow,proto3,oneof"`
} }
type Event_EventBatch struct { type Event_EventBatch struct {
EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"` EventBatch *EventBatch `protobuf:"bytes,5,opt,name=EventBatch,proto3,oneof"`
} }
type Event_ServiceHealth struct { type Event_ServiceHealth struct {
ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"` ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"`
@ -285,13 +285,6 @@ func (m *Event) GetPayload() isEvent_Payload {
return nil return nil
} }
func (m *Event) GetTopic() Topic {
if m != nil {
return m.Topic
}
return Topic_Unknown
}
func (m *Event) GetKey() string { func (m *Event) GetKey() string {
if m != nil { if m != nil {
return m.Key return m.Key
@ -353,17 +346,17 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
if x.EndOfSnapshot { if x.EndOfSnapshot {
t = 1 t = 1
} }
_ = b.EncodeVarint(5<<3 | proto.WireVarint) _ = b.EncodeVarint(3<<3 | proto.WireVarint)
_ = b.EncodeVarint(t) _ = b.EncodeVarint(t)
case *Event_NewSnapshotToFollow: case *Event_NewSnapshotToFollow:
t := uint64(0) t := uint64(0)
if x.NewSnapshotToFollow { if x.NewSnapshotToFollow {
t = 1 t = 1
} }
_ = b.EncodeVarint(6<<3 | proto.WireVarint) _ = b.EncodeVarint(4<<3 | proto.WireVarint)
_ = b.EncodeVarint(t) _ = b.EncodeVarint(t)
case *Event_EventBatch: case *Event_EventBatch:
_ = b.EncodeVarint(7<<3 | proto.WireBytes) _ = b.EncodeVarint(5<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.EventBatch); err != nil { if err := b.EncodeMessage(x.EventBatch); err != nil {
return err return err
} }
@ -382,21 +375,21 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*Event) m := msg.(*Event)
switch tag { switch tag {
case 5: // Payload.EndOfSnapshot case 3: // Payload.EndOfSnapshot
if wire != proto.WireVarint { if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType return true, proto.ErrInternalBadWireType
} }
x, err := b.DecodeVarint() x, err := b.DecodeVarint()
m.Payload = &Event_EndOfSnapshot{x != 0} m.Payload = &Event_EndOfSnapshot{x != 0}
return true, err return true, err
case 6: // Payload.NewSnapshotToFollow case 4: // Payload.NewSnapshotToFollow
if wire != proto.WireVarint { if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType return true, proto.ErrInternalBadWireType
} }
x, err := b.DecodeVarint() x, err := b.DecodeVarint()
m.Payload = &Event_NewSnapshotToFollow{x != 0} m.Payload = &Event_NewSnapshotToFollow{x != 0}
return true, err return true, err
case 7: // Payload.EventBatch case 5: // Payload.EventBatch
if wire != proto.WireBytes { if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType return true, proto.ErrInternalBadWireType
} }
@ -558,41 +551,41 @@ func init() {
func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) } func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) }
var fileDescriptor_ab3eb8c810e315fb = []byte{ var fileDescriptor_ab3eb8c810e315fb = []byte{
// 543 bytes of a gzipped FileDescriptorProto // 542 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0xcd, 0x6e, 0xd3, 0x4c, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x14, 0xf5, 0x24, 0xfd, 0xf3, 0xcd, 0xd7, 0xca, 0xdf, 0xb4, 0x08, 0x2b, 0x45, 0x56, 0x88, 0x50, 0x10, 0xf6, 0xba, 0xbf, 0x9e, 0xd0, 0xca, 0x6c, 0x8b, 0xb0, 0x5a, 0x64, 0x85, 0x08, 0x55, 0xa1,
0x65, 0x2a, 0x11, 0xa3, 0x20, 0xc1, 0x0e, 0x44, 0xd2, 0x96, 0x20, 0xa4, 0x04, 0x39, 0xed, 0x02, 0x12, 0x31, 0x0a, 0x12, 0xdc, 0x40, 0x24, 0x6d, 0x09, 0x42, 0x4a, 0x90, 0xd3, 0x1e, 0xe0, 0xb6,
0x76, 0x13, 0xfb, 0x12, 0x5b, 0x71, 0x67, 0x8c, 0x3d, 0x69, 0xe8, 0x9e, 0x87, 0xe0, 0x49, 0xd8, 0xb1, 0x87, 0xd8, 0x8a, 0xbb, 0x6b, 0xec, 0x4d, 0x43, 0xef, 0xbc, 0x03, 0x3c, 0x09, 0xcf, 0xc0,
0xb2, 0x65, 0xc9, 0x23, 0xa0, 0xf0, 0x22, 0x28, 0x13, 0xc7, 0x71, 0x92, 0xee, 0xd8, 0xcd, 0x3d, 0x91, 0x47, 0x40, 0xe1, 0x45, 0x50, 0x36, 0x8e, 0x63, 0x27, 0xbd, 0x79, 0xbe, 0x9f, 0xdd, 0xcf,
0x3f, 0xe3, 0xa3, 0x39, 0xbe, 0xf0, 0x30, 0x4e, 0x84, 0x14, 0x4e, 0x3c, 0x48, 0xc7, 0x83, 0xd4, 0xb3, 0x33, 0xf0, 0x38, 0x4e, 0x84, 0x14, 0x4e, 0x3c, 0x48, 0xc7, 0x83, 0xd4, 0x4b, 0xc2, 0x01,
0x4b, 0xc2, 0x01, 0x3a, 0xf9, 0xa9, 0xa1, 0x38, 0xaa, 0xe7, 0x40, 0xb5, 0x9a, 0xab, 0x31, 0xb9, 0x3a, 0xf9, 0x57, 0x43, 0x71, 0xd4, 0xc8, 0x81, 0xa3, 0xa3, 0x5c, 0x8d, 0xc9, 0x4d, 0xe8, 0xa1,
0x09, 0x3d, 0x74, 0xb8, 0xf0, 0x33, 0x59, 0xfd, 0x3b, 0x01, 0xa3, 0xbf, 0x50, 0xba, 0xf8, 0x79, 0xc3, 0x85, 0x9f, 0xc9, 0x6a, 0xbf, 0x08, 0x98, 0xfd, 0x85, 0xd2, 0xc5, 0xaf, 0x63, 0x4c, 0x25,
0x8c, 0xa9, 0xa4, 0x27, 0xb0, 0x7d, 0x29, 0xe2, 0xd0, 0x33, 0x49, 0x8d, 0xd8, 0x07, 0x4d, 0xa3, 0x3d, 0x81, 0xad, 0x4b, 0x11, 0x87, 0x9e, 0x45, 0xaa, 0xa4, 0xbe, 0xdf, 0x34, 0x1b, 0xcb, 0xc3,
0xb1, 0xbc, 0x5c, 0xe1, 0xee, 0x9c, 0xa6, 0x06, 0x94, 0xdf, 0xe1, 0xad, 0x59, 0xaa, 0x11, 0x5b, 0x15, 0xee, 0xce, 0x69, 0x6a, 0xc2, 0xc6, 0x07, 0xbc, 0xb5, 0xf4, 0x2a, 0xa9, 0x1b, 0xee, 0xec,
0x77, 0x67, 0x47, 0x7a, 0x34, 0x73, 0x8e, 0x90, 0x9b, 0x65, 0x85, 0xcd, 0x87, 0x19, 0xfa, 0x96, 0x93, 0x1e, 0xce, 0x9c, 0x23, 0xe4, 0xd6, 0x86, 0xc2, 0xe6, 0xc5, 0x0c, 0x7d, 0xcf, 0x7d, 0xfc,
0xfb, 0xf8, 0xc5, 0xdc, 0xaa, 0x11, 0x7b, 0xcb, 0x9d, 0x0f, 0xd4, 0x02, 0x38, 0x63, 0x92, 0x79, 0x66, 0x6d, 0x56, 0x49, 0x7d, 0xd3, 0x9d, 0x17, 0xd4, 0x06, 0x38, 0x63, 0x92, 0x79, 0xc8, 0x25,
0xc8, 0x25, 0x26, 0xe6, 0xb6, 0x32, 0x14, 0x10, 0xfa, 0x00, 0xf4, 0x2e, 0xbb, 0xc6, 0x34, 0x66, 0x26, 0xd6, 0x96, 0x32, 0x14, 0x10, 0xfa, 0x08, 0x8c, 0x2e, 0xbb, 0xc6, 0x34, 0x66, 0x1e, 0x5a,
0x1e, 0x9a, 0x3b, 0x8a, 0x5e, 0x02, 0xf5, 0x1f, 0x25, 0xd8, 0x3e, 0xbf, 0x41, 0xfe, 0x8f, 0x69, 0xdb, 0x8a, 0x5e, 0x02, 0xb5, 0x1f, 0x3a, 0x6c, 0x9d, 0xdf, 0x20, 0x97, 0x8b, 0x14, 0xa4, 0x94,
0xe7, 0xb9, 0xca, 0xc5, 0x5c, 0x27, 0xb0, 0x7f, 0xce, 0xfd, 0xde, 0xa7, 0x3e, 0x67, 0x71, 0x1a, 0x62, 0x7e, 0x9f, 0x5e, 0xbc, 0xef, 0x04, 0xf6, 0xce, 0xb9, 0xdf, 0xfb, 0xd2, 0xe7, 0x2c, 0x4e,
0x08, 0xa9, 0xa2, 0xed, 0x75, 0x34, 0x77, 0x15, 0xa6, 0x4d, 0x38, 0xec, 0xe2, 0x64, 0x31, 0x5e, 0x03, 0x21, 0x55, 0xc6, 0xdd, 0x8e, 0xe6, 0x96, 0x61, 0xda, 0x84, 0x83, 0x2e, 0x4e, 0x16, 0xe5,
0x8a, 0x0b, 0x11, 0x45, 0x62, 0xa2, 0x92, 0xce, 0xd4, 0x77, 0x91, 0xf4, 0x05, 0x80, 0x0a, 0xdd, 0xa5, 0xb8, 0x10, 0x51, 0x24, 0x26, 0x2a, 0xfb, 0x4c, 0x7d, 0x17, 0x49, 0x5f, 0x01, 0xa8, 0x30,
0x62, 0xd2, 0x0b, 0xcc, 0xdd, 0x1a, 0xb1, 0x2b, 0xcd, 0x7b, 0x85, 0xc0, 0x4b, 0xb2, 0xa3, 0xb9, 0x2d, 0x26, 0xbd, 0x40, 0xfd, 0x4b, 0xa5, 0xf9, 0xa0, 0xd0, 0xb6, 0x25, 0xd9, 0xd1, 0xdc, 0x82,
0x05, 0x29, 0xbd, 0x80, 0xfd, 0xfe, 0xbc, 0xbd, 0x0e, 0xb2, 0x48, 0x06, 0x26, 0x28, 0xaf, 0x55, 0x94, 0x5e, 0xc0, 0x5e, 0x7f, 0xfe, 0x2a, 0x1d, 0x64, 0x91, 0x0c, 0x2c, 0x50, 0x5e, 0xbb, 0xe0,
0xf0, 0xae, 0xf0, 0x57, 0xb1, 0xcf, 0x24, 0xce, 0x42, 0xaf, 0xc0, 0x2d, 0x1d, 0x76, 0xdf, 0xb3, 0x2d, 0xf1, 0x57, 0xb1, 0xcf, 0x24, 0xce, 0x42, 0x97, 0xe0, 0x96, 0x01, 0x3b, 0x1f, 0xd9, 0x6d,
0xdb, 0x48, 0x30, 0xbf, 0xfe, 0xbc, 0x98, 0x85, 0xda, 0xb0, 0xa3, 0xa6, 0xd4, 0x24, 0xb5, 0xb2, 0x24, 0x98, 0x5f, 0x7b, 0x59, 0xcc, 0x42, 0xeb, 0xb0, 0xad, 0xaa, 0xd4, 0x22, 0xd5, 0x8d, 0x7a,
0x5d, 0x59, 0x79, 0x46, 0x45, 0xb8, 0x19, 0x5f, 0xff, 0x4a, 0xe0, 0xf0, 0x8e, 0x6f, 0xd1, 0x47, 0xa5, 0xf4, 0x98, 0x8a, 0x70, 0x33, 0xbe, 0xf6, 0x9d, 0xc0, 0xc1, 0x1d, 0x77, 0xd1, 0x27, 0xa0,
0x50, 0xea, 0xc5, 0x59, 0x09, 0x47, 0x05, 0x77, 0x9b, 0x49, 0x16, 0x89, 0x61, 0x2f, 0x76, 0x4b, 0xf7, 0xe2, 0x6c, 0x14, 0x0e, 0x0b, 0xee, 0x36, 0x93, 0x2c, 0x12, 0xc3, 0x5e, 0xec, 0xea, 0xbd,
0xbd, 0x98, 0xbe, 0x01, 0xa3, 0x1d, 0xa0, 0x37, 0xca, 0x6e, 0xe8, 0x0a, 0x1f, 0x55, 0x25, 0x95, 0x98, 0xbe, 0x03, 0xb3, 0x1d, 0xa0, 0x37, 0xca, 0x4e, 0xe8, 0x0a, 0x1f, 0x55, 0xfb, 0x2b, 0xcd,
0xe6, 0x71, 0x23, 0xff, 0x43, 0x1b, 0xeb, 0x12, 0x77, 0xc3, 0x74, 0xfa, 0x3a, 0xab, 0x9d, 0x56, 0xe3, 0x46, 0x3e, 0x79, 0x8d, 0x55, 0x89, 0xbb, 0x66, 0x3a, 0x7d, 0x9b, 0x0d, 0x1f, 0xad, 0xc0,
0x60, 0xf7, 0x8a, 0x8f, 0xb8, 0x98, 0x70, 0x43, 0xa3, 0xff, 0xaf, 0xbd, 0x93, 0x41, 0xa8, 0x09, 0xce, 0x15, 0x1f, 0x71, 0x31, 0xe1, 0xa6, 0x46, 0xef, 0xaf, 0xf4, 0xc9, 0x24, 0xd4, 0x82, 0xc3,
0x47, 0x2b, 0x50, 0x5b, 0x70, 0x8e, 0x9e, 0x34, 0x4a, 0xa7, 0x8f, 0x41, 0xcf, 0xc3, 0xd1, 0xff, 0x12, 0xd4, 0x16, 0x9c, 0xa3, 0x27, 0x4d, 0xfd, 0xf4, 0x29, 0x18, 0x79, 0x38, 0x7a, 0x0f, 0x76,
0x60, 0xcf, 0xc5, 0x61, 0x98, 0x4a, 0x4c, 0x0c, 0x8d, 0x1e, 0x00, 0x9c, 0x61, 0xb2, 0x98, 0x49, 0x5d, 0x1c, 0x86, 0xa9, 0xc4, 0xc4, 0xd4, 0xe8, 0x3e, 0xc0, 0x19, 0x26, 0x8b, 0x9a, 0x34, 0x3f,
0xf3, 0x03, 0xdc, 0xef, 0x4b, 0x26, 0xb1, 0x1d, 0x30, 0x3e, 0xc4, 0x6c, 0x63, 0x62, 0x19, 0x0a, 0xc1, 0xc3, 0xbe, 0x64, 0x12, 0xdb, 0x01, 0xe3, 0x43, 0xcc, 0x36, 0x21, 0x96, 0xa1, 0xe0, 0xf4,
0x4e, 0x5f, 0x82, 0x9e, 0x6f, 0x10, 0x3d, 0x2e, 0x16, 0xb2, 0xb6, 0x57, 0xd5, 0x8d, 0x37, 0xad, 0x35, 0x18, 0xf9, 0x66, 0xd0, 0xe3, 0xe2, 0x83, 0xac, 0xec, 0xcb, 0xd1, 0x5a, 0x4f, 0x6b, 0xda,
0x6b, 0x4f, 0x49, 0xeb, 0xd5, 0xcf, 0xa9, 0x45, 0x7e, 0x4d, 0x2d, 0xf2, 0x7b, 0x6a, 0x91, 0x6f, 0x73, 0xd2, 0x7a, 0xf3, 0x7b, 0x6a, 0x93, 0x3f, 0x53, 0x9b, 0xfc, 0x9d, 0xda, 0xe4, 0xe7, 0x3f,
0x7f, 0x2c, 0xed, 0xe3, 0x93, 0x61, 0x28, 0x83, 0xf1, 0xa0, 0xe1, 0x89, 0x6b, 0x27, 0x60, 0x69, 0x5b, 0xfb, 0xfc, 0x6c, 0x18, 0xca, 0x60, 0x3c, 0x68, 0x78, 0xe2, 0xda, 0x09, 0x58, 0x1a, 0x84,
0x10, 0x7a, 0x22, 0x89, 0x1d, 0x4f, 0xf0, 0x74, 0x1c, 0x39, 0x1b, 0xab, 0x3f, 0xd8, 0x51, 0xd0, 0x9e, 0x48, 0x62, 0xc7, 0x13, 0x3c, 0x1d, 0x47, 0xce, 0xda, 0x4a, 0x0f, 0xb6, 0x15, 0xf4, 0xe2,
0xb3, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x45, 0xc8, 0x3f, 0xd7, 0x16, 0x04, 0x00, 0x00, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0xe5, 0xc9, 0x9d, 0xee, 0x03, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -841,19 +834,14 @@ func (m *Event) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if m.Index != 0 { if m.Index != 0 {
i = encodeVarintSubscribe(dAtA, i, uint64(m.Index)) i = encodeVarintSubscribe(dAtA, i, uint64(m.Index))
i-- i--
dAtA[i] = 0x18 dAtA[i] = 0x10
} }
if len(m.Key) > 0 { if len(m.Key) > 0 {
i -= len(m.Key) i -= len(m.Key)
copy(dAtA[i:], m.Key) copy(dAtA[i:], m.Key)
i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Key))) i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Key)))
i-- i--
dAtA[i] = 0x12 dAtA[i] = 0xa
}
if m.Topic != 0 {
i = encodeVarintSubscribe(dAtA, i, uint64(m.Topic))
i--
dAtA[i] = 0x8
} }
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
@ -871,7 +859,7 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0 dAtA[i] = 0
} }
i-- i--
dAtA[i] = 0x28 dAtA[i] = 0x18
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) { func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) {
@ -887,7 +875,7 @@ func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, erro
dAtA[i] = 0 dAtA[i] = 0
} }
i-- i--
dAtA[i] = 0x30 dAtA[i] = 0x20
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *Event_EventBatch) MarshalTo(dAtA []byte) (int, error) { func (m *Event_EventBatch) MarshalTo(dAtA []byte) (int, error) {
@ -906,7 +894,7 @@ func (m *Event_EventBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i = encodeVarintSubscribe(dAtA, i, uint64(size)) i = encodeVarintSubscribe(dAtA, i, uint64(size))
} }
i-- i--
dAtA[i] = 0x3a dAtA[i] = 0x2a
} }
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
@ -1066,9 +1054,6 @@ func (m *Event) Size() (n int) {
} }
var l int var l int
_ = l _ = l
if m.Topic != 0 {
n += 1 + sovSubscribe(uint64(m.Topic))
}
l = len(m.Key) l = len(m.Key)
if l > 0 { if l > 0 {
n += 1 + l + sovSubscribe(uint64(l)) n += 1 + l + sovSubscribe(uint64(l))
@ -1420,25 +1405,6 @@ func (m *Event) Unmarshal(dAtA []byte) error {
} }
switch fieldNum { switch fieldNum {
case 1: case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
}
m.Topic = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSubscribe
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Topic |= Topic(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
} }
@ -1470,7 +1436,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
} }
m.Key = string(dAtA[iNdEx:postIndex]) m.Key = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
case 3: case 2:
if wireType != 0 { if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
} }
@ -1489,7 +1455,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
break break
} }
} }
case 5: case 3:
if wireType != 0 { if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndOfSnapshot", wireType) return fmt.Errorf("proto: wrong wireType = %d for field EndOfSnapshot", wireType)
} }
@ -1510,7 +1476,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
} }
b := bool(v != 0) b := bool(v != 0)
m.Payload = &Event_EndOfSnapshot{b} m.Payload = &Event_EndOfSnapshot{b}
case 6: case 4:
if wireType != 0 { if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType) return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType)
} }
@ -1531,7 +1497,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
} }
b := bool(v != 0) b := bool(v != 0)
m.Payload = &Event_NewSnapshotToFollow{b} m.Payload = &Event_NewSnapshotToFollow{b}
case 7: case 5:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType) return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType)
} }

View File

@ -85,37 +85,34 @@ message SubscribeRequest {
// describe the current "snapshot" of the result as well as ongoing mutations to // describe the current "snapshot" of the result as well as ongoing mutations to
// that snapshot. // that snapshot.
message Event { message Event {
// Topic the event was published to
Topic Topic = 1;
// Key is the logical identifier for the entity that was mutated. // Key is the logical identifier for the entity that was mutated.
string Key = 2; string Key = 1;
// Index is the raft index at which the mutation took place. At the top // Index is the raft index at which the mutation took place. At the top
// level of a subscription there will always be at most one Event per index. // level of a subscription there will always be at most one Event per index.
// If multiple events are published to the same topic in a single raft // If multiple events are published to the same topic in a single raft
// transaction then the batch of events will be encoded inside a single // transaction then the batch of events will be encoded inside a single
// top-level event to ensure they are delivered atomically to clients. // top-level event to ensure they are delivered atomically to clients.
uint64 Index = 3; uint64 Index = 2;
// Payload is the actual event content. // Payload is the actual event content.
oneof Payload { oneof Payload {
// EndOfSnapshot indicates the event stream for the initial snapshot has // EndOfSnapshot indicates the event stream for the initial snapshot has
// ended. Subsequent Events delivered will be mutations to that result. // ended. Subsequent Events delivered will be mutations to that result.
bool EndOfSnapshot = 5; bool EndOfSnapshot = 3;
// NewSnapshotToFollow indicates that the client view is stale. The client // NewSnapshotToFollow indicates that the client view is stale. The client
// must reset its view before handing any more events. Subsequent events // must reset its view before handing any more events. Subsequent events
// in the stream will be for a new snapshot until an EndOfSnapshot event // in the stream will be for a new snapshot until an EndOfSnapshot event
// is received. // is received.
bool NewSnapshotToFollow = 6; bool NewSnapshotToFollow = 4;
// EventBatch is a set of events. This is typically used as the payload // EventBatch is a set of events. This is typically used as the payload
// type where multiple events are emitted in a single topic and raft // type where multiple events are emitted in a single topic and raft
// index (e.g. transactional updates). In this case the Topic and Index // index (e.g. transactional updates). In this case the Topic and Index
// values of all events will match and the whole set should be delivered // values of all events will match and the whole set should be delivered
// and consumed atomically. // and consumed atomically.
EventBatch EventBatch = 7; EventBatch EventBatch = 5;
// ServiceHealth is used for ServiceHealth and ServiceHealthConnect // ServiceHealth is used for ServiceHealth and ServiceHealthConnect
// topics. // topics.