proto: remove Event.Key field

The field is never used, and the value is available from the payload.
This commit is contained in:
Daniel Nephin 2020-10-27 13:53:11 -04:00
parent ab43236f86
commit c106d94742
5 changed files with 62 additions and 137 deletions

View File

@ -35,7 +35,6 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -114,7 +113,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Key: svc,
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{

View File

@ -140,7 +140,7 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool)
}
func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
e := &pbsubscribe.Event{Key: event.Key, Index: event.Index}
e := &pbsubscribe.Event{Index: event.Index}
switch {
case event.IsEndOfSnapshot():
e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}
@ -178,7 +178,7 @@ func batchEventsFromEventSlice(events []stream.Event) []*pbsubscribe.Event {
result := make([]*pbsubscribe.Event, len(events))
for i := range events {
event := events[i]
result[i] = &pbsubscribe.Event{Key: event.Key, Index: event.Index}
result[i] = &pbsubscribe.Event{Index: event.Index}
setPayload(result[i], event.Payload)
}
return result

View File

@ -107,7 +107,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{
{
Key: "redis",
Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -138,7 +137,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
},
},
{
Key: "redis",
Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -169,7 +167,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
},
},
{
Key: "redis",
Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
},
@ -189,7 +186,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
event := getEvent(t, chEvents)
expectedEvent := &pbsubscribe.Event{
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -459,7 +455,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{
{
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -490,7 +485,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
},
},
{
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -521,7 +515,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
},
},
{
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
},
@ -541,7 +534,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
event := getEvent(t, chEvents)
expectedEvent := &pbsubscribe.Event{
Key: "redis",
Index: ids.Last(),
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -922,11 +914,9 @@ func TestNewEventFromSteamEvent(t *testing.T) {
{
name: "event batch",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: []stream.Event{
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -937,7 +927,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
@ -950,13 +939,11 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: []*pbsubscribe.Event{
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -969,7 +956,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
@ -989,7 +975,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
{
name: "event payload CheckServiceNode",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -1000,7 +985,6 @@ func TestNewEventFromSteamEvent(t *testing.T) {
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{

View File

@ -6,16 +6,14 @@ package pbsubscribe
import (
context "context"
fmt "fmt"
io "io"
math "math"
math_bits "math/bits"
proto "github.com/golang/protobuf/proto"
pbservice "github.com/hashicorp/consul/proto/pbservice"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
pbservice "github.com/hashicorp/consul/proto/pbservice"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -200,14 +198,12 @@ func (m *SubscribeRequest) GetNamespace() string {
// describe the current "snapshot" of the result as well as ongoing mutations to
// that snapshot.
type Event struct {
// Key is the logical identifier for the entity that was mutated.
Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"`
// Index is the raft index at which the mutation took place. At the top
// level of a subscription there will always be at most one Event per index.
// If multiple events are published to the same topic in a single raft
// transaction then the batch of events will be encoded inside a single
// top-level event to ensure they are delivered atomically to clients.
Index uint64 `protobuf:"varint,2,opt,name=Index,proto3" json:"Index,omitempty"`
Index uint64 `protobuf:"varint,1,opt,name=Index,proto3" json:"Index,omitempty"`
// Payload is the actual event content.
//
// Types that are valid to be assigned to Payload:
@ -261,13 +257,13 @@ type isEvent_Payload interface {
}
type Event_EndOfSnapshot struct {
EndOfSnapshot bool `protobuf:"varint,3,opt,name=EndOfSnapshot,proto3,oneof"`
EndOfSnapshot bool `protobuf:"varint,2,opt,name=EndOfSnapshot,proto3,oneof"`
}
type Event_NewSnapshotToFollow struct {
NewSnapshotToFollow bool `protobuf:"varint,4,opt,name=NewSnapshotToFollow,proto3,oneof"`
NewSnapshotToFollow bool `protobuf:"varint,3,opt,name=NewSnapshotToFollow,proto3,oneof"`
}
type Event_EventBatch struct {
EventBatch *EventBatch `protobuf:"bytes,5,opt,name=EventBatch,proto3,oneof"`
EventBatch *EventBatch `protobuf:"bytes,4,opt,name=EventBatch,proto3,oneof"`
}
type Event_ServiceHealth struct {
ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"`
@ -285,13 +281,6 @@ func (m *Event) GetPayload() isEvent_Payload {
return nil
}
func (m *Event) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Event) GetIndex() uint64 {
if m != nil {
return m.Index
@ -346,17 +335,17 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
if x.EndOfSnapshot {
t = 1
}
_ = b.EncodeVarint(3<<3 | proto.WireVarint)
_ = b.EncodeVarint(2<<3 | proto.WireVarint)
_ = b.EncodeVarint(t)
case *Event_NewSnapshotToFollow:
t := uint64(0)
if x.NewSnapshotToFollow {
t = 1
}
_ = b.EncodeVarint(4<<3 | proto.WireVarint)
_ = b.EncodeVarint(3<<3 | proto.WireVarint)
_ = b.EncodeVarint(t)
case *Event_EventBatch:
_ = b.EncodeVarint(5<<3 | proto.WireBytes)
_ = b.EncodeVarint(4<<3 | proto.WireBytes)
if err := b.EncodeMessage(x.EventBatch); err != nil {
return err
}
@ -375,21 +364,21 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*Event)
switch tag {
case 3: // Payload.EndOfSnapshot
case 2: // Payload.EndOfSnapshot
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.Payload = &Event_EndOfSnapshot{x != 0}
return true, err
case 4: // Payload.NewSnapshotToFollow
case 3: // Payload.NewSnapshotToFollow
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.Payload = &Event_NewSnapshotToFollow{x != 0}
return true, err
case 5: // Payload.EventBatch
case 4: // Payload.EventBatch
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
@ -551,41 +540,41 @@ func init() {
func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) }
var fileDescriptor_ab3eb8c810e315fb = []byte{
// 542 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xf6, 0xba, 0xbf, 0x9e, 0xd0, 0xca, 0x6c, 0x8b, 0xb0, 0x5a, 0x64, 0x85, 0x08, 0x55, 0xa1,
0x12, 0x31, 0x0a, 0x12, 0xdc, 0x40, 0x24, 0x6d, 0x09, 0x42, 0x4a, 0x90, 0xd3, 0x1e, 0xe0, 0xb6,
0xb1, 0x87, 0xd8, 0x8a, 0xbb, 0x6b, 0xec, 0x4d, 0x43, 0xef, 0xbc, 0x03, 0x3c, 0x09, 0xcf, 0xc0,
0x91, 0x47, 0x40, 0xe1, 0x45, 0x50, 0x36, 0x8e, 0x63, 0x27, 0xbd, 0x79, 0xbe, 0x9f, 0xdd, 0xcf,
0xb3, 0x33, 0xf0, 0x38, 0x4e, 0x84, 0x14, 0x4e, 0x3c, 0x48, 0xc7, 0x83, 0xd4, 0x4b, 0xc2, 0x01,
0x3a, 0xf9, 0x57, 0x43, 0x71, 0xd4, 0xc8, 0x81, 0xa3, 0xa3, 0x5c, 0x8d, 0xc9, 0x4d, 0xe8, 0xa1,
0xc3, 0x85, 0x9f, 0xc9, 0x6a, 0xbf, 0x08, 0x98, 0xfd, 0x85, 0xd2, 0xc5, 0xaf, 0x63, 0x4c, 0x25,
0x3d, 0x81, 0xad, 0x4b, 0x11, 0x87, 0x9e, 0x45, 0xaa, 0xa4, 0xbe, 0xdf, 0x34, 0x1b, 0xcb, 0xc3,
0x15, 0xee, 0xce, 0x69, 0x6a, 0xc2, 0xc6, 0x07, 0xbc, 0xb5, 0xf4, 0x2a, 0xa9, 0x1b, 0xee, 0xec,
0x93, 0x1e, 0xce, 0x9c, 0x23, 0xe4, 0xd6, 0x86, 0xc2, 0xe6, 0xc5, 0x0c, 0x7d, 0xcf, 0x7d, 0xfc,
0x66, 0x6d, 0x56, 0x49, 0x7d, 0xd3, 0x9d, 0x17, 0xd4, 0x06, 0x38, 0x63, 0x92, 0x79, 0xc8, 0x25,
0x26, 0xd6, 0x96, 0x32, 0x14, 0x10, 0xfa, 0x08, 0x8c, 0x2e, 0xbb, 0xc6, 0x34, 0x66, 0x1e, 0x5a,
0xdb, 0x8a, 0x5e, 0x02, 0xb5, 0x1f, 0x3a, 0x6c, 0x9d, 0xdf, 0x20, 0x97, 0x8b, 0x14, 0xa4, 0x94,
0x62, 0x7e, 0x9f, 0x5e, 0xbc, 0xef, 0x04, 0xf6, 0xce, 0xb9, 0xdf, 0xfb, 0xd2, 0xe7, 0x2c, 0x4e,
0x03, 0x21, 0x55, 0xc6, 0xdd, 0x8e, 0xe6, 0x96, 0x61, 0xda, 0x84, 0x83, 0x2e, 0x4e, 0x16, 0xe5,
0xa5, 0xb8, 0x10, 0x51, 0x24, 0x26, 0x2a, 0xfb, 0x4c, 0x7d, 0x17, 0x49, 0x5f, 0x01, 0xa8, 0x30,
0x2d, 0x26, 0xbd, 0x40, 0xfd, 0x4b, 0xa5, 0xf9, 0xa0, 0xd0, 0xb6, 0x25, 0xd9, 0xd1, 0xdc, 0x82,
0x94, 0x5e, 0xc0, 0x5e, 0x7f, 0xfe, 0x2a, 0x1d, 0x64, 0x91, 0x0c, 0x2c, 0x50, 0x5e, 0xbb, 0xe0,
0x2d, 0xf1, 0x57, 0xb1, 0xcf, 0x24, 0xce, 0x42, 0x97, 0xe0, 0x96, 0x01, 0x3b, 0x1f, 0xd9, 0x6d,
0x24, 0x98, 0x5f, 0x7b, 0x59, 0xcc, 0x42, 0xeb, 0xb0, 0xad, 0xaa, 0xd4, 0x22, 0xd5, 0x8d, 0x7a,
0xa5, 0xf4, 0x98, 0x8a, 0x70, 0x33, 0xbe, 0xf6, 0x9d, 0xc0, 0xc1, 0x1d, 0x77, 0xd1, 0x27, 0xa0,
0xf7, 0xe2, 0x6c, 0x14, 0x0e, 0x0b, 0xee, 0x36, 0x93, 0x2c, 0x12, 0xc3, 0x5e, 0xec, 0xea, 0xbd,
0x98, 0xbe, 0x03, 0xb3, 0x1d, 0xa0, 0x37, 0xca, 0x4e, 0xe8, 0x0a, 0x1f, 0x55, 0xfb, 0x2b, 0xcd,
0xe3, 0x46, 0x3e, 0x79, 0x8d, 0x55, 0x89, 0xbb, 0x66, 0x3a, 0x7d, 0x9b, 0x0d, 0x1f, 0xad, 0xc0,
0xce, 0x15, 0x1f, 0x71, 0x31, 0xe1, 0xa6, 0x46, 0xef, 0xaf, 0xf4, 0xc9, 0x24, 0xd4, 0x82, 0xc3,
0x12, 0xd4, 0x16, 0x9c, 0xa3, 0x27, 0x4d, 0xfd, 0xf4, 0x29, 0x18, 0x79, 0x38, 0x7a, 0x0f, 0x76,
0x5d, 0x1c, 0x86, 0xa9, 0xc4, 0xc4, 0xd4, 0xe8, 0x3e, 0xc0, 0x19, 0x26, 0x8b, 0x9a, 0x34, 0x3f,
0xc1, 0xc3, 0xbe, 0x64, 0x12, 0xdb, 0x01, 0xe3, 0x43, 0xcc, 0x36, 0x21, 0x96, 0xa1, 0xe0, 0xf4,
0x35, 0x18, 0xf9, 0x66, 0xd0, 0xe3, 0xe2, 0x83, 0xac, 0xec, 0xcb, 0xd1, 0x5a, 0x4f, 0x6b, 0xda,
0x73, 0xd2, 0x7a, 0xf3, 0x7b, 0x6a, 0x93, 0x3f, 0x53, 0x9b, 0xfc, 0x9d, 0xda, 0xe4, 0xe7, 0x3f,
0x5b, 0xfb, 0xfc, 0x6c, 0x18, 0xca, 0x60, 0x3c, 0x68, 0x78, 0xe2, 0xda, 0x09, 0x58, 0x1a, 0x84,
0x9e, 0x48, 0x62, 0xc7, 0x13, 0x3c, 0x1d, 0x47, 0xce, 0xda, 0x4a, 0x0f, 0xb6, 0x15, 0xf4, 0xe2,
0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0xe5, 0xc9, 0x9d, 0xee, 0x03, 0x00, 0x00,
// 536 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x5f, 0x6f, 0xd2, 0x50,
0x14, 0xef, 0x65, 0x83, 0xad, 0x07, 0xb7, 0xd4, 0x3b, 0x8c, 0x0d, 0x33, 0x0d, 0x12, 0xb3, 0xe0,
0x12, 0xa9, 0xc1, 0x44, 0xdf, 0x34, 0xc2, 0x36, 0x31, 0x26, 0x60, 0xca, 0xf6, 0xa0, 0x6f, 0x97,
0xf6, 0x48, 0x1b, 0xba, 0x7b, 0x6b, 0x7b, 0x19, 0xee, 0x5d, 0xbf, 0x83, 0x9f, 0xc4, 0xcf, 0xe0,
0xa3, 0x1f, 0xc1, 0xe0, 0x17, 0x31, 0x5c, 0x4a, 0x29, 0xb0, 0xb7, 0x9e, 0xdf, 0x9f, 0x73, 0x4f,
0xcf, 0x1f, 0x78, 0x1c, 0xc5, 0x42, 0x0a, 0x3b, 0x1a, 0x26, 0x93, 0x61, 0xe2, 0xc6, 0xc1, 0x10,
0xed, 0xec, 0xab, 0xa9, 0x38, 0xaa, 0x67, 0x40, 0xb5, 0x9a, 0xa9, 0x31, 0xbe, 0x09, 0x5c, 0xb4,
0xb9, 0xf0, 0x52, 0x59, 0xfd, 0x17, 0x01, 0x63, 0xb0, 0x54, 0x3a, 0xf8, 0x75, 0x82, 0x89, 0xa4,
0x27, 0x50, 0xbc, 0x14, 0x51, 0xe0, 0x9a, 0xa4, 0x46, 0x1a, 0x87, 0x2d, 0xa3, 0xb9, 0x4a, 0xae,
0x70, 0x67, 0x41, 0x53, 0x03, 0x76, 0x3e, 0xe0, 0xad, 0x59, 0xa8, 0x91, 0x86, 0xee, 0xcc, 0x3f,
0x69, 0x65, 0xee, 0x1c, 0x23, 0x37, 0x77, 0x14, 0xb6, 0x08, 0xe6, 0xe8, 0x7b, 0xee, 0xe1, 0x37,
0x73, 0xb7, 0x46, 0x1a, 0xbb, 0xce, 0x22, 0xa0, 0x16, 0xc0, 0x19, 0x93, 0xcc, 0x45, 0x2e, 0x31,
0x36, 0x8b, 0xca, 0x90, 0x43, 0xe8, 0x23, 0xd0, 0x7b, 0xec, 0x1a, 0x93, 0x88, 0xb9, 0x68, 0x96,
0x14, 0xbd, 0x02, 0xea, 0x3f, 0x0a, 0x50, 0x3c, 0xbf, 0x41, 0x2e, 0x57, 0xd9, 0x49, 0x3e, 0xfb,
0x09, 0x1c, 0x9c, 0x73, 0xaf, 0xff, 0x65, 0xc0, 0x59, 0x94, 0xf8, 0x42, 0xaa, 0x2a, 0xf7, 0xbb,
0x9a, 0xb3, 0x0e, 0xd3, 0x16, 0x1c, 0xf5, 0x70, 0xba, 0x0c, 0x2f, 0xc5, 0x85, 0x08, 0x43, 0x31,
0x55, 0xf5, 0xcf, 0xd5, 0x77, 0x91, 0xf4, 0x15, 0x80, 0x7a, 0xba, 0xcd, 0xa4, 0xeb, 0xab, 0x9f,
0x2a, 0xb7, 0x1e, 0xe4, 0x9a, 0xb4, 0x22, 0xbb, 0x9a, 0x93, 0x93, 0xd2, 0x0b, 0x38, 0x18, 0x2c,
0x66, 0xd0, 0x45, 0x16, 0x4a, 0xdf, 0x04, 0xe5, 0xb5, 0x72, 0xde, 0x35, 0xfe, 0x2a, 0xf2, 0x98,
0xc4, 0x79, 0xd1, 0x6b, 0x70, 0x5b, 0x87, 0xbd, 0x8f, 0xec, 0x36, 0x14, 0xcc, 0xab, 0xbf, 0xcc,
0xd7, 0x42, 0x1b, 0x50, 0x52, 0x51, 0x62, 0x92, 0xda, 0x4e, 0xa3, 0xbc, 0x36, 0x3a, 0x45, 0x38,
0x29, 0x5f, 0xff, 0x4e, 0xe0, 0xe8, 0x8e, 0xb7, 0xe8, 0x13, 0x28, 0xf4, 0xa3, 0x74, 0xf0, 0x95,
0x9c, 0xbb, 0xc3, 0x24, 0x0b, 0xc5, 0xa8, 0x1f, 0x39, 0x85, 0x7e, 0x44, 0xdf, 0x81, 0xd1, 0xf1,
0xd1, 0x1d, 0xa7, 0x19, 0x7a, 0xc2, 0x43, 0xd5, 0xe0, 0x72, 0xeb, 0xb8, 0x99, 0xed, 0x59, 0x73,
0x53, 0xe2, 0x6c, 0x99, 0x4e, 0xdf, 0xa6, 0xab, 0x46, 0xcb, 0xb0, 0x77, 0xc5, 0xc7, 0x5c, 0x4c,
0xb9, 0xa1, 0xd1, 0xfb, 0x1b, 0x7d, 0x32, 0x08, 0x35, 0xa1, 0xb2, 0x06, 0x75, 0x04, 0xe7, 0xe8,
0x4a, 0xa3, 0x70, 0xfa, 0x14, 0xf4, 0xac, 0x38, 0x7a, 0x0f, 0xf6, 0x1d, 0x1c, 0x05, 0x89, 0xc4,
0xd8, 0xd0, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xcb, 0x98, 0xb4, 0x3e, 0xc1, 0xc3, 0x81, 0x64, 0x12,
0x3b, 0x3e, 0xe3, 0x23, 0x4c, 0xf7, 0x3e, 0x92, 0x81, 0xe0, 0xf4, 0x35, 0xe8, 0xd9, 0x1d, 0xd0,
0xe3, 0xfc, 0x40, 0x36, 0xae, 0xa3, 0xba, 0xd5, 0xd3, 0xba, 0xf6, 0x9c, 0xb4, 0xdf, 0xfc, 0x9e,
0x59, 0xe4, 0xcf, 0xcc, 0x22, 0x7f, 0x67, 0x16, 0xf9, 0xf9, 0xcf, 0xd2, 0x3e, 0x3f, 0x1b, 0x05,
0xd2, 0x9f, 0x0c, 0x9b, 0xae, 0xb8, 0xb6, 0x7d, 0x96, 0xf8, 0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05,
0x4f, 0x26, 0xa1, 0xbd, 0x75, 0xc0, 0xc3, 0x92, 0x82, 0x5e, 0xfc, 0x0f, 0x00, 0x00, 0xff, 0xff,
0x8f, 0x56, 0x73, 0x78, 0xdc, 0x03, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -834,14 +823,7 @@ func (m *Event) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if m.Index != 0 {
i = encodeVarintSubscribe(dAtA, i, uint64(m.Index))
i--
dAtA[i] = 0x10
}
if len(m.Key) > 0 {
i -= len(m.Key)
copy(dAtA[i:], m.Key)
i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Key)))
i--
dAtA[i] = 0xa
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
@ -859,7 +841,7 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0
}
i--
dAtA[i] = 0x18
dAtA[i] = 0x10
return len(dAtA) - i, nil
}
func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) {
@ -875,7 +857,7 @@ func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, erro
dAtA[i] = 0
}
i--
dAtA[i] = 0x20
dAtA[i] = 0x18
return len(dAtA) - i, nil
}
func (m *Event_EventBatch) MarshalTo(dAtA []byte) (int, error) {
@ -894,7 +876,7 @@ func (m *Event_EventBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i = encodeVarintSubscribe(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
dAtA[i] = 0x22
}
return len(dAtA) - i, nil
}
@ -1054,10 +1036,6 @@ func (m *Event) Size() (n int) {
}
var l int
_ = l
l = len(m.Key)
if l > 0 {
n += 1 + l + sovSubscribe(uint64(l))
}
if m.Index != 0 {
n += 1 + sovSubscribe(uint64(m.Index))
}
@ -1405,38 +1383,6 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSubscribe
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSubscribe
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSubscribe
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
}
@ -1455,7 +1401,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
break
}
}
case 3:
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndOfSnapshot", wireType)
}
@ -1476,7 +1422,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
b := bool(v != 0)
m.Payload = &Event_EndOfSnapshot{b}
case 4:
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType)
}
@ -1497,7 +1443,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
}
b := bool(v != 0)
m.Payload = &Event_NewSnapshotToFollow{b}
case 5:
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType)
}

View File

@ -85,34 +85,31 @@ message SubscribeRequest {
// describe the current "snapshot" of the result as well as ongoing mutations to
// that snapshot.
message Event {
// Key is the logical identifier for the entity that was mutated.
string Key = 1;
// Index is the raft index at which the mutation took place. At the top
// level of a subscription there will always be at most one Event per index.
// If multiple events are published to the same topic in a single raft
// transaction then the batch of events will be encoded inside a single
// top-level event to ensure they are delivered atomically to clients.
uint64 Index = 2;
uint64 Index = 1;
// Payload is the actual event content.
oneof Payload {
// EndOfSnapshot indicates the event stream for the initial snapshot has
// ended. Subsequent Events delivered will be mutations to that result.
bool EndOfSnapshot = 3;
bool EndOfSnapshot = 2;
// NewSnapshotToFollow indicates that the client view is stale. The client
// must reset its view before handing any more events. Subsequent events
// in the stream will be for a new snapshot until an EndOfSnapshot event
// is received.
bool NewSnapshotToFollow = 4;
bool NewSnapshotToFollow = 3;
// EventBatch is a set of events. This is typically used as the payload
// type where multiple events are emitted in a single topic and raft
// index (e.g. transactional updates). In this case the Topic and Index
// values of all events will match and the whole set should be delivered
// and consumed atomically.
EventBatch EventBatch = 5;
EventBatch EventBatch = 4;
// ServiceHealth is used for ServiceHealth and ServiceHealthConnect
// topics.