Update proto for change to stream framing events

This commit is contained in:
Daniel Nephin 2020-10-01 14:41:47 -04:00
parent 0769f54fe1
commit 754a1bbe24
2 changed files with 106 additions and 88 deletions

View File

@ -200,7 +200,7 @@ type Event struct {
// //
// Types that are valid to be assigned to Payload: // Types that are valid to be assigned to Payload:
// *Event_EndOfSnapshot // *Event_EndOfSnapshot
// *Event_EndOfEmptySnapshot // *Event_NewSnapshotToFollow
// *Event_EventBatch // *Event_EventBatch
// *Event_ServiceHealth // *Event_ServiceHealth
Payload isEvent_Payload `protobuf_oneof:"Payload"` Payload isEvent_Payload `protobuf_oneof:"Payload"`
@ -251,8 +251,8 @@ type isEvent_Payload interface {
type Event_EndOfSnapshot struct { type Event_EndOfSnapshot struct {
EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"` EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"`
} }
type Event_EndOfEmptySnapshot struct { type Event_NewSnapshotToFollow struct {
EndOfEmptySnapshot bool `protobuf:"varint,6,opt,name=EndOfEmptySnapshot,proto3,oneof"` NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"`
} }
type Event_EventBatch struct { type Event_EventBatch struct {
EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"` EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"`
@ -261,10 +261,10 @@ type Event_ServiceHealth struct {
ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"` ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"`
} }
func (*Event_EndOfSnapshot) isEvent_Payload() {} func (*Event_EndOfSnapshot) isEvent_Payload() {}
func (*Event_EndOfEmptySnapshot) isEvent_Payload() {} func (*Event_NewSnapshotToFollow) isEvent_Payload() {}
func (*Event_EventBatch) isEvent_Payload() {} func (*Event_EventBatch) isEvent_Payload() {}
func (*Event_ServiceHealth) isEvent_Payload() {} func (*Event_ServiceHealth) isEvent_Payload() {}
func (m *Event) GetPayload() isEvent_Payload { func (m *Event) GetPayload() isEvent_Payload {
if m != nil { if m != nil {
@ -301,9 +301,9 @@ func (m *Event) GetEndOfSnapshot() bool {
return false return false
} }
func (m *Event) GetEndOfEmptySnapshot() bool { func (m *Event) GetNewSnapshotToFollow() bool {
if x, ok := m.GetPayload().(*Event_EndOfEmptySnapshot); ok { if x, ok := m.GetPayload().(*Event_NewSnapshotToFollow); ok {
return x.EndOfEmptySnapshot return x.NewSnapshotToFollow
} }
return false return false
} }
@ -326,7 +326,7 @@ func (m *Event) GetServiceHealth() *ServiceHealthUpdate {
func (*Event) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { func (*Event) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _Event_OneofMarshaler, _Event_OneofUnmarshaler, _Event_OneofSizer, []interface{}{ return _Event_OneofMarshaler, _Event_OneofUnmarshaler, _Event_OneofSizer, []interface{}{
(*Event_EndOfSnapshot)(nil), (*Event_EndOfSnapshot)(nil),
(*Event_EndOfEmptySnapshot)(nil), (*Event_NewSnapshotToFollow)(nil),
(*Event_EventBatch)(nil), (*Event_EventBatch)(nil),
(*Event_ServiceHealth)(nil), (*Event_ServiceHealth)(nil),
} }
@ -343,9 +343,9 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
} }
_ = b.EncodeVarint(5<<3 | proto.WireVarint) _ = b.EncodeVarint(5<<3 | proto.WireVarint)
_ = b.EncodeVarint(t) _ = b.EncodeVarint(t)
case *Event_EndOfEmptySnapshot: case *Event_NewSnapshotToFollow:
t := uint64(0) t := uint64(0)
if x.EndOfEmptySnapshot { if x.NewSnapshotToFollow {
t = 1 t = 1
} }
_ = b.EncodeVarint(6<<3 | proto.WireVarint) _ = b.EncodeVarint(6<<3 | proto.WireVarint)
@ -377,12 +377,12 @@ func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer)
x, err := b.DecodeVarint() x, err := b.DecodeVarint()
m.Payload = &Event_EndOfSnapshot{x != 0} m.Payload = &Event_EndOfSnapshot{x != 0}
return true, err return true, err
case 6: // Payload.EndOfEmptySnapshot case 6: // Payload.NewSnapshotToFollow
if wire != proto.WireVarint { if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType return true, proto.ErrInternalBadWireType
} }
x, err := b.DecodeVarint() x, err := b.DecodeVarint()
m.Payload = &Event_EndOfEmptySnapshot{x != 0} m.Payload = &Event_NewSnapshotToFollow{x != 0}
return true, err return true, err
case 7: // Payload.EventBatch case 7: // Payload.EventBatch
if wire != proto.WireBytes { if wire != proto.WireBytes {
@ -412,7 +412,7 @@ func _Event_OneofSizer(msg proto.Message) (n int) {
case *Event_EndOfSnapshot: case *Event_EndOfSnapshot:
n += 1 // tag and wire n += 1 // tag and wire
n += 1 n += 1
case *Event_EndOfEmptySnapshot: case *Event_NewSnapshotToFollow:
n += 1 // tag and wire n += 1 // tag and wire
n += 1 n += 1
case *Event_EventBatch: case *Event_EventBatch:
@ -546,40 +546,40 @@ func init() {
func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) } func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) }
var fileDescriptor_ab3eb8c810e315fb = []byte{ var fileDescriptor_ab3eb8c810e315fb = []byte{
// 521 bytes of a gzipped FileDescriptorProto // 526 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x4f, 0x8f, 0xd2, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x5f, 0x6f, 0xd2, 0x50,
0x14, 0xef, 0xc0, 0x02, 0xcb, 0xc3, 0xdd, 0xd4, 0x11, 0x63, 0xc3, 0x26, 0x0d, 0x12, 0xb3, 0xa9, 0x14, 0xef, 0x85, 0x01, 0xe3, 0xe0, 0x96, 0x7a, 0x87, 0xb1, 0x61, 0x49, 0x83, 0xc4, 0x2c, 0x75,
0x9b, 0x48, 0x37, 0x98, 0xe8, 0x4d, 0x23, 0x2c, 0x8a, 0x31, 0x11, 0x53, 0xdc, 0x83, 0xde, 0x86, 0x89, 0xd4, 0x60, 0xa2, 0x6f, 0x1a, 0x61, 0x9b, 0x18, 0x93, 0x61, 0xca, 0xf6, 0xa0, 0x6f, 0x97,
0xf6, 0x49, 0x1b, 0xd8, 0x99, 0xb1, 0x1d, 0x56, 0xb9, 0xfb, 0x21, 0xf6, 0xcb, 0x78, 0xf7, 0xe8, 0xf6, 0x48, 0x1b, 0xea, 0xbd, 0xb5, 0xbd, 0x0c, 0xf7, 0xee, 0x87, 0xd8, 0xb7, 0xf1, 0xd5, 0x47,
0x47, 0x30, 0xf8, 0x45, 0x0c, 0x43, 0xb7, 0x5b, 0x60, 0x6f, 0xde, 0xfa, 0x7e, 0x7f, 0xe6, 0xfd, 0x3f, 0x82, 0xc1, 0x2f, 0x62, 0xb8, 0x94, 0xae, 0xc0, 0xde, 0xf6, 0xd6, 0xf3, 0xfb, 0x73, 0xcf,
0xf2, 0x5e, 0x1f, 0x3c, 0x94, 0xb1, 0x50, 0xc2, 0x95, 0xe3, 0x64, 0x3e, 0x4e, 0xfc, 0x38, 0x1a, 0x2f, 0xe7, 0xf4, 0xc0, 0x93, 0x28, 0x16, 0x52, 0xd8, 0xd1, 0x28, 0x99, 0x8e, 0x12, 0x37, 0x0e,
0xa3, 0x9b, 0x7d, 0xb5, 0x35, 0x47, 0xab, 0x19, 0xd0, 0x68, 0x64, 0x6a, 0x8c, 0x2f, 0x23, 0x1f, 0x46, 0x68, 0x67, 0x5f, 0x6d, 0xc5, 0xd1, 0x6a, 0x06, 0x34, 0x1a, 0x99, 0x1a, 0xe3, 0xab, 0xc0,
0x5d, 0x2e, 0x82, 0x54, 0xd6, 0xba, 0x22, 0x60, 0x8e, 0xae, 0x95, 0x1e, 0x7e, 0x9d, 0x63, 0xa2, 0x45, 0x9b, 0x0b, 0x2f, 0x95, 0xb5, 0x6e, 0x08, 0xe8, 0xc3, 0x95, 0xd2, 0xc1, 0xef, 0x53, 0x4c,
0xe8, 0x31, 0x94, 0x3e, 0x0a, 0x19, 0xf9, 0x16, 0x69, 0x12, 0xe7, 0xb0, 0x63, 0xb6, 0x6f, 0x1e, 0x24, 0x3d, 0x82, 0xd2, 0x85, 0x88, 0x02, 0xd7, 0x20, 0x4d, 0x62, 0xed, 0x77, 0xf4, 0xf6, 0xed,
0xd7, 0xb8, 0xb7, 0xa6, 0xa9, 0x09, 0xc5, 0x77, 0xb8, 0xb0, 0x0a, 0x4d, 0xe2, 0x54, 0xbd, 0xd5, 0xe3, 0x0a, 0x77, 0x96, 0x34, 0xd5, 0xa1, 0xf8, 0x11, 0xaf, 0x8d, 0x42, 0x93, 0x58, 0x55, 0x67,
0x27, 0xad, 0xaf, 0x9c, 0x53, 0xe4, 0x56, 0x51, 0x63, 0xeb, 0x62, 0x85, 0xbe, 0xe5, 0x01, 0x7e, 0xf1, 0x49, 0xeb, 0x0b, 0xe7, 0x04, 0xb9, 0x51, 0x54, 0xd8, 0xb2, 0x58, 0xa0, 0x1f, 0xb8, 0x87,
0xb7, 0xf6, 0x9a, 0xc4, 0xd9, 0xf3, 0xd6, 0x05, 0xb5, 0x01, 0xce, 0x98, 0x62, 0x3e, 0x72, 0x85, 0x3f, 0x8c, 0x9d, 0x26, 0xb1, 0x76, 0x9c, 0x65, 0x41, 0x4d, 0x80, 0x13, 0x26, 0x99, 0x8b, 0x5c,
0xb1, 0x55, 0xd2, 0x86, 0x1c, 0xd2, 0xfa, 0x59, 0x80, 0x52, 0xff, 0x12, 0xf9, 0x7f, 0xe6, 0x59, 0x62, 0x6c, 0x94, 0x94, 0x21, 0x87, 0xb4, 0x7e, 0x15, 0xa0, 0x74, 0x7a, 0x85, 0xfc, 0x9e, 0x79,
0x77, 0x2e, 0xe6, 0x3b, 0x1f, 0xc3, 0x41, 0x9f, 0x07, 0xc3, 0x2f, 0x23, 0xce, 0x64, 0x12, 0x0a, 0x96, 0x9d, 0x8b, 0xf9, 0xce, 0x47, 0xb0, 0x77, 0xca, 0xbd, 0xc1, 0xd7, 0x21, 0x67, 0x51, 0xe2,
0xa5, 0x9b, 0xef, 0x0f, 0x0c, 0x6f, 0x13, 0xa6, 0xa7, 0x40, 0x35, 0xd0, 0xbf, 0x90, 0x6a, 0x91, 0x0b, 0xa9, 0x9a, 0xef, 0xf6, 0x35, 0x67, 0x1d, 0xa6, 0x1d, 0x38, 0x38, 0xc7, 0xd9, 0xaa, 0xbc,
0x89, 0xcb, 0xa9, 0xf8, 0x16, 0x8e, 0x3e, 0x07, 0xd0, 0x91, 0xbb, 0x4c, 0xf9, 0xa1, 0x55, 0x69, 0x10, 0x67, 0x22, 0x0c, 0xc5, 0xcc, 0x28, 0xa7, 0xea, 0xbb, 0x48, 0xfa, 0x1a, 0x40, 0x85, 0xee,
0x12, 0xa7, 0xd6, 0xb9, 0x9f, 0x8b, 0x7b, 0x43, 0x0e, 0x0c, 0x2f, 0x27, 0xa5, 0xaf, 0xe1, 0x60, 0x32, 0xe9, 0xfa, 0x46, 0xa5, 0x49, 0xac, 0x5a, 0xe7, 0x51, 0x2e, 0xf0, 0x2d, 0xd9, 0xd7, 0x9c,
0xb4, 0xde, 0xce, 0x00, 0xd9, 0x4c, 0x85, 0x16, 0x68, 0xaf, 0x9d, 0xf3, 0x6e, 0xf0, 0xe7, 0x32, 0x9c, 0x94, 0x9e, 0xc1, 0xde, 0x70, 0xb9, 0x9f, 0x3e, 0xb2, 0x50, 0xfa, 0x06, 0x28, 0xaf, 0x99,
0x60, 0x0a, 0x57, 0x91, 0x37, 0xe0, 0x6e, 0x15, 0x2a, 0x1f, 0xd8, 0x62, 0x26, 0x58, 0xd0, 0x7a, 0xf3, 0xae, 0xf1, 0x97, 0x91, 0xc7, 0x24, 0x2e, 0x42, 0xaf, 0xc1, 0xdd, 0x2a, 0x54, 0x3e, 0xb1,
0x96, 0xcf, 0x42, 0x1d, 0x28, 0xeb, 0x2a, 0xb1, 0x48, 0xb3, 0xe8, 0xd4, 0x36, 0x86, 0xa8, 0x09, 0xeb, 0x50, 0x30, 0xaf, 0xf5, 0x2a, 0x9f, 0x85, 0x5a, 0x50, 0x56, 0x55, 0x62, 0x90, 0x66, 0xd1,
0x2f, 0xe5, 0x5b, 0x3f, 0x08, 0xdc, 0xbb, 0xa5, 0x17, 0x7d, 0x04, 0x85, 0xa1, 0x4c, 0x57, 0x50, 0xaa, 0xad, 0x8d, 0x51, 0x11, 0x4e, 0xca, 0xb7, 0x7e, 0x12, 0x38, 0xb8, 0xa3, 0x17, 0x7d, 0x0a,
0xcf, 0xb9, 0x7b, 0x4c, 0xb1, 0x99, 0x98, 0x0c, 0xa5, 0x57, 0x18, 0x4a, 0xfa, 0x06, 0xcc, 0x5e, 0x85, 0x41, 0x94, 0x2e, 0xa1, 0x9e, 0x73, 0xf7, 0x98, 0x64, 0xa1, 0x18, 0x0f, 0x22, 0xa7, 0x30,
0x88, 0xfe, 0x34, 0x7d, 0xe1, 0xbd, 0x08, 0x50, 0x2f, 0xa4, 0xd6, 0x39, 0x6a, 0x67, 0x7f, 0x60, 0x88, 0xe8, 0x7b, 0xd0, 0x7b, 0x3e, 0xba, 0x93, 0xf4, 0x85, 0x73, 0xe1, 0xa1, 0x5a, 0x49, 0xad,
0x7b, 0x5b, 0xe2, 0xed, 0x98, 0x4e, 0x5e, 0xa5, 0x4b, 0xa7, 0x35, 0xa8, 0x9c, 0xf3, 0x29, 0x17, 0x73, 0xd8, 0xce, 0xfe, 0xc1, 0xf6, 0xa6, 0xc4, 0xd9, 0x32, 0x1d, 0xbf, 0x4b, 0xd7, 0x4e, 0x6b,
0xdf, 0xb8, 0x69, 0xd0, 0xbb, 0x5b, 0x73, 0x32, 0x09, 0xb5, 0xa0, 0xbe, 0x01, 0xf5, 0x04, 0xe7, 0x50, 0xb9, 0xe4, 0x13, 0x2e, 0x66, 0x5c, 0xd7, 0xe8, 0xc3, 0x8d, 0x39, 0xe9, 0x84, 0x1a, 0x50,
0xe8, 0x2b, 0xb3, 0x70, 0xf2, 0x18, 0xaa, 0x59, 0x38, 0x7a, 0x07, 0xf6, 0x3d, 0x9c, 0x44, 0x89, 0x5f, 0x83, 0x7a, 0x82, 0x73, 0x74, 0xa5, 0x5e, 0x38, 0x7e, 0x06, 0xd5, 0x2c, 0x1c, 0x7d, 0x00,
0xc2, 0xd8, 0x34, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xd7, 0x35, 0xe9, 0x7c, 0x82, 0x07, 0x23, 0xc5, 0xbb, 0x0e, 0x8e, 0x83, 0x44, 0x62, 0xac, 0x6b, 0x74, 0x1f, 0xe0, 0x04, 0xe3, 0x55, 0x4d, 0x3a,
0x14, 0xf6, 0x42, 0xc6, 0x27, 0x98, 0x5e, 0x84, 0x54, 0x91, 0xe0, 0xf4, 0x05, 0x54, 0xb3, 0x0b, 0x9f, 0xe1, 0xf1, 0x50, 0x32, 0x89, 0x3d, 0x9f, 0xf1, 0x31, 0xa6, 0x37, 0x11, 0xc9, 0x40, 0x70,
0xa1, 0x47, 0xf9, 0x85, 0x6c, 0xdd, 0x4d, 0x63, 0x67, 0xa6, 0x2d, 0xe3, 0x94, 0x74, 0x5f, 0xfe, 0xfa, 0x06, 0xaa, 0xd9, 0x8d, 0xd0, 0xc3, 0xfc, 0x42, 0x36, 0x2e, 0xa7, 0xb1, 0x35, 0xd3, 0x96,
0x5a, 0xda, 0xe4, 0xf7, 0xd2, 0x26, 0x7f, 0x96, 0x36, 0xb9, 0xfa, 0x6b, 0x1b, 0x9f, 0x9f, 0x4c, 0xf6, 0x82, 0x74, 0xdf, 0xfe, 0x9e, 0x9b, 0xe4, 0xcf, 0xdc, 0x24, 0x7f, 0xe7, 0x26, 0xb9, 0xf9,
0x22, 0x15, 0xce, 0xc7, 0x6d, 0x5f, 0x5c, 0xb8, 0x21, 0x4b, 0xc2, 0xc8, 0x17, 0xb1, 0x74, 0x7d, 0x67, 0x6a, 0x5f, 0x9e, 0x8f, 0x03, 0xe9, 0x4f, 0x47, 0x6d, 0x57, 0x7c, 0xb3, 0x7d, 0x96, 0xf8,
0xc1, 0x93, 0xf9, 0xcc, 0xdd, 0x39, 0xed, 0x71, 0x59, 0x43, 0x4f, 0xff, 0x05, 0x00, 0x00, 0xff, 0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, 0x4f, 0xa6, 0xa1, 0xbd, 0x75, 0xdc, 0xa3, 0xb2, 0x82, 0x5e,
0xff, 0x7d, 0xf7, 0xca, 0x01, 0xf6, 0x03, 0x00, 0x00, 0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x44, 0xbc, 0x0a, 0xfb, 0xf8, 0x03, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -595,18 +595,24 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StateChangeSubscriptionClient interface { type StateChangeSubscriptionClient interface {
// Subscribe to a topic to receive events when there are changes to the topic. // Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
// //
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
// //
// Subscribe may return an ABORTED status error to indicate the client must // If SubscribeRequest.Index is > 0 it is assumed the client already has a
// re-start the Subscribe call. // snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the // This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed // stream, for example because the ACL permissions for the token changed, or
// and the server doesn't know which previously delivered events should // because the server state was restored from a snapshot.
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (StateChangeSubscription_SubscribeClient, error) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (StateChangeSubscription_SubscribeClient, error)
} }
@ -653,18 +659,24 @@ func (x *stateChangeSubscriptionSubscribeClient) Recv() (*Event, error) {
// StateChangeSubscriptionServer is the server API for StateChangeSubscription service. // StateChangeSubscriptionServer is the server API for StateChangeSubscription service.
type StateChangeSubscriptionServer interface { type StateChangeSubscriptionServer interface {
// Subscribe to a topic to receive events when there are changes to the topic. // Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
// //
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
// //
// Subscribe may return an ABORTED status error to indicate the client must // If SubscribeRequest.Index is > 0 it is assumed the client already has a
// re-start the Subscribe call. // snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the // This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed // stream, for example because the ACL permissions for the token changed, or
// and the server doesn't know which previously delivered events should // because the server state was restored from a snapshot.
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
Subscribe(*SubscribeRequest, StateChangeSubscription_SubscribeServer) error Subscribe(*SubscribeRequest, StateChangeSubscription_SubscribeServer) error
} }
@ -842,14 +854,14 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0x28 dAtA[i] = 0x28
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *Event_EndOfEmptySnapshot) MarshalTo(dAtA []byte) (int, error) { func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) {
return m.MarshalToSizedBuffer(dAtA[:m.Size()]) return m.MarshalToSizedBuffer(dAtA[:m.Size()])
} }
func (m *Event_EndOfEmptySnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA) i := len(dAtA)
i-- i--
if m.EndOfEmptySnapshot { if m.NewSnapshotToFollow {
dAtA[i] = 1 dAtA[i] = 1
} else { } else {
dAtA[i] = 0 dAtA[i] = 0
@ -1058,7 +1070,7 @@ func (m *Event_EndOfSnapshot) Size() (n int) {
n += 2 n += 2
return n return n
} }
func (m *Event_EndOfEmptySnapshot) Size() (n int) { func (m *Event_NewSnapshotToFollow) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
} }
@ -1444,7 +1456,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
m.Payload = &Event_EndOfSnapshot{b} m.Payload = &Event_EndOfSnapshot{b}
case 6: case 6:
if wireType != 0 { if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndOfEmptySnapshot", wireType) return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType)
} }
var v int var v int
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
@ -1462,7 +1474,7 @@ func (m *Event) Unmarshal(dAtA []byte) error {
} }
} }
b := bool(v != 0) b := bool(v != 0)
m.Payload = &Event_EndOfEmptySnapshot{b} m.Payload = &Event_NewSnapshotToFollow{b}
case 7: case 7:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType) return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType)

View File

@ -13,18 +13,24 @@ import "proto/pbservice/node.proto";
// state change events. Events are streamed as they happen. // state change events. Events are streamed as they happen.
service StateChangeSubscription { service StateChangeSubscription {
// Subscribe to a topic to receive events when there are changes to the topic. // Subscribe to a topic to receive events when there are changes to the topic.
// TODO: document how to handle framing events
// //
// If SubscribeRequest.Index is 0 the event stream will start with one or
// more snapshot events, followed by an EndOfSnapshot event. Subsequent
// events will be a live stream of events as they happen.
// //
// Subscribe may return an ABORTED status error to indicate the client must // If SubscribeRequest.Index is > 0 it is assumed the client already has a
// re-start the Subscribe call. // snapshot, and is trying to resume a stream that was disconnected. The
// client will either receive a NewSnapshotToFollow event, indicating the
// client view is stale and it must reset its view and prepare for a new
// snapshot. Or, if no NewSnapshotToFollow event is received, the client
// view is still fresh, and all events will be the live stream.
//
// Subscribe may return a gRPC status error with codes.ABORTED to indicate
// the client view is now stale due to a change on the server. The client
// must reset its view and issue a new Subscribe call to restart the stream.
// This error is used when the server can no longer correctly maintain the // This error is used when the server can no longer correctly maintain the
// stream, for example because the ACL permissions for the token changed // stream, for example because the ACL permissions for the token changed, or
// and the server doesn't know which previously delivered events should // because the server state was restored from a snapshot.
// now not be visible. Clients when receiving this must reset their
// local copy of the state to empty and start over from index 0 to get a
// valid snapshot again. Servers may also send this if their state store
// is restored from a snapshot.
rpc Subscribe(SubscribeRequest) returns (stream Event) {} rpc Subscribe(SubscribeRequest) returns (stream Event) {}
} }
@ -92,11 +98,11 @@ message Event {
// ended. Subsequent Events delivered will be mutations to that result. // ended. Subsequent Events delivered will be mutations to that result.
bool EndOfSnapshot = 5; bool EndOfSnapshot = 5;
// EndOfEmptySnapshot indicates that the client is still up-to-date. // NewSnapshotToFollow indicates that the client view is stale. The client
// The snapshot has ended, and was empty. The rest of the stream will be // must reset its view before handing any more events. Subsequent events
// individual update events. It distinguishes between "up to date, no snapshot" // in the stream will be for a new snapshot until an EndOfSnapshot event
// and "snapshot contains zero events but you should reset any old state to be blank". // is received.
bool EndOfEmptySnapshot = 6; bool NewSnapshotToFollow = 6;
// EventBatch is a set of events. This is typically used as the payload // EventBatch is a set of events. This is typically used as the payload
// type where multiple events are emitted in a single topic and raft // type where multiple events are emitted in a single topic and raft