stream: change Topic to an interface

Consumers of the package can decide on which type to use for the Topic. In the future we may
use a gRPC type for the topic.
This commit is contained in:
Daniel Nephin 2020-07-06 20:04:24 -04:00
parent aa571bd0ce
commit 16a2b3fafc
8 changed files with 28 additions and 18 deletions

View File

@ -140,6 +140,13 @@ func (tx *txn) Commit() error {
return nil return nil
} }
// TODO: may be replaced by a gRPC type.
type topic string
func (t topic) String() string {
return string(t)
}
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: add other table handlers here. // TODO: add other table handlers here.
return aclChangeUnsubscribeEvent(tx, changes) return aclChangeUnsubscribeEvent(tx, changes)

View File

@ -369,7 +369,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
} }
} }
var topicService stream.Topic = 901 var topicService stream.Topic = topic("test-topic-service")
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{ return stream.SnapshotHandlers{

View File

@ -4,17 +4,14 @@ to the state store.
*/ */
package stream package stream
type Topic int32 import "fmt"
// TODO: remove underscores // Topic is an identifier that partitions events. A subscription will only receive
// TODO: type string instead of int? // events which match the Topic.
// TODO: move topics to state package? type Topic fmt.Stringer
const (
Topic_ServiceHealth Topic = 1
Topic_ServiceHealthConnect Topic = 2
)
// TODO: // Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers.
type Event struct { type Event struct {
Topic Topic Topic Topic
Key string Key string

View File

@ -38,7 +38,7 @@ func TestEventBufferFuzz(t *testing.T) {
// streaming - here we only care about the semantics of the buffer. // streaming - here we only care about the semantics of the buffer.
e := Event{ e := Event{
Index: uint64(i), // Indexes should be contiguous Index: uint64(i), // Indexes should be contiguous
Topic: Topic_ServiceHealth, Topic: testTopic,
} }
b.Append([]Event{e}) b.Append([]Event{e})
// Sleep sometimes for a while to let some subscribers catch up // Sleep sometimes for a while to let some subscribers catch up

View File

@ -169,7 +169,7 @@ func (e *EventPublisher) Subscribe(
// Ensure we know how to make a snapshot for this topic // Ensure we know how to make a snapshot for this topic
_, ok := e.snapshotHandlers[req.Topic] _, ok := e.snapshotHandlers[req.Topic]
if !ok { if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic) return nil, fmt.Errorf("unknown topic %v", req.Topic)
} }
e.lock.Lock() e.lock.Lock()
@ -286,7 +286,7 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *buf
handler, ok := e.snapshotHandlers[req.Topic] handler, ok := e.snapshotHandlers[req.Topic]
if !ok { if !ok {
return nil, fmt.Errorf("unknown topic %d", req.Topic) return nil, fmt.Errorf("unknown topic %v", req.Topic)
} }
snap = newEventSnapshot(req, topicHead, handler) snap = newEventSnapshot(req, topicHead, handler)

View File

@ -9,7 +9,13 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var testTopic Topic = 999 type intTopic int
func (i intTopic) String() string {
return fmt.Sprintf("%d", i)
}
var testTopic Topic = intTopic(999)
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
subscription := &SubscribeRequest{ subscription := &SubscribeRequest{

View File

@ -170,7 +170,7 @@ func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc {
func newDefaultHealthEvent(index uint64, n int) Event { func newDefaultHealthEvent(index uint64, n int) Event {
return Event{ return Event{
Index: index, Index: index,
Topic: Topic_ServiceHealth, Topic: testTopic,
Payload: fmt.Sprintf("test-event-%03d", n), Payload: fmt.Sprintf("test-event-%03d", n),
} }
} }

View File

@ -23,7 +23,7 @@ func TestSubscription(t *testing.T) {
// Create a subscription // Create a subscription
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: Topic_ServiceHealth, Topic: testTopic,
Key: "test", Key: "test",
} }
sub := newSubscription(ctx, req, startHead) sub := newSubscription(ctx, req, startHead)
@ -103,7 +103,7 @@ func TestSubscription_Close(t *testing.T) {
// Create a subscription // Create a subscription
req := &SubscribeRequest{ req := &SubscribeRequest{
Topic: Topic_ServiceHealth, Topic: testTopic,
Key: "test", Key: "test",
} }
sub := newSubscription(ctx, req, startHead) sub := newSubscription(ctx, req, startHead)
@ -141,7 +141,7 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
// but enough to test subscription mechanics. // but enough to test subscription mechanics.
e := Event{ e := Event{
Index: index, Index: index,
Topic: Topic_ServiceHealth, Topic: testTopic,
Key: key, Key: key,
} }
b.Append([]Event{e}) b.Append([]Event{e})