From 2e45bbbb3e4352b0bb2827df8ab1fb5642230f13 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 5 Jun 2020 19:36:31 -0400 Subject: [PATCH] stream: Use local types for Event Topic SubscriptionRequest --- agent/consul/state/event_publisher.go | 59 +++++++++++----------- agent/consul/state/stream_topics.go | 6 +-- agent/consul/stream/event.go | 26 ++++++++++ agent/consul/stream/event_buffer.go | 12 ++--- agent/consul/stream/event_buffer_test.go | 20 ++++---- agent/consul/stream/event_snapshot.go | 12 ++--- agent/consul/stream/event_snapshot_test.go | 59 ++++++++-------------- agent/consul/stream/subscription.go | 19 ++++--- agent/consul/stream/subscription_test.go | 32 +++++------- 9 files changed, 125 insertions(+), 120 deletions(-) create mode 100644 agent/consul/stream/event.go diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index 7b9520829..277e03032 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -9,7 +9,6 @@ import ( "github.com/hashicorp/go-memdb" "golang.org/x/crypto/blake2b" - "github.com/hashicorp/consul/agent/agentpb" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" ) @@ -38,20 +37,20 @@ type EventPublisher struct { // topicBuffers stores the head of the linked-list buffer to publish events to // for a topic. - topicBuffers map[agentpb.Topic]*stream.EventBuffer + topicBuffers map[stream.Topic]*stream.EventBuffer // snapCache stores the head of any snapshot buffers still in cache if caching // is enabled. - snapCache map[agentpb.Topic]map[string]*stream.EventSnapshot + snapCache map[stream.Topic]map[string]*stream.EventSnapshot // snapFns is the set of snapshot functions that were registered bound to the // state store. - snapFns map[agentpb.Topic]stream.SnapFn + snapFns map[stream.Topic]stream.SnapFn // subsByToken stores a list of Subscription objects outstanding indexed by a // hash of the ACL token they used to subscribe so we can reload them if their // ACL permissions change. - subsByToken map[string]map[*agentpb.SubscribeRequest]*stream.Subscription + subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription // commitCh decouples the Commit call in the FSM hot path from distributing // the resulting events. @@ -59,8 +58,8 @@ type EventPublisher struct { } type commitUpdate struct { - tx *txnWrapper - events []agentpb.Event + tx *txn + events []stream.Event } func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Duration) *EventPublisher { @@ -68,10 +67,10 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura store: store, topicBufferSize: topicBufferSize, snapCacheTTL: snapCacheTTL, - topicBuffers: make(map[agentpb.Topic]*stream.EventBuffer), - snapCache: make(map[agentpb.Topic]map[string]*stream.EventSnapshot), - snapFns: make(map[agentpb.Topic]stream.SnapFn), - subsByToken: make(map[string]map[*agentpb.SubscribeRequest]*stream.Subscription), + topicBuffers: make(map[stream.Topic]*stream.EventBuffer), + snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), + snapFns: make(map[stream.Topic]stream.SnapFn), + subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription), commitCh: make(chan commitUpdate, 64), } @@ -79,7 +78,7 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura // TODO: document why for topic, handlers := range topicRegistry { fnCopy := handlers.Snapshot - e.snapFns[topic] = func(req *agentpb.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) { + e.snapFns[topic] = func(req *stream.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) { return fnCopy(e.store, req, buf) } } @@ -90,7 +89,7 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura } func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error { - var events []agentpb.Event + var events []stream.Event for topic, th := range topicRegistry { if th.ProcessChanges != nil { es, err := th.ProcessChanges(e.store, tx, changes) @@ -132,15 +131,15 @@ func (e *EventPublisher) sendEvents(update commitUpdate) { // implementation. defer update.tx.Abort() - eventsByTopic := make(map[agentpb.Topic][]agentpb.Event) + eventsByTopic := make(map[stream.Topic][]stream.Event) for _, event := range update.events { // If the event is an ACL update, treat it as a special case. Currently // ACL update events are only used internally to recognize when a subscriber // should reload its subscription. - if event.Topic == agentpb.Topic_ACLTokens || - event.Topic == agentpb.Topic_ACLPolicies || - event.Topic == agentpb.Topic_ACLRoles { + if event.Topic == stream.Topic_ACLTokens || + event.Topic == stream.Topic_ACLPolicies || + event.Topic == stream.Topic_ACLRoles { if err := e.handleACLUpdate(update.tx, event); err != nil { // This seems pretty drastic? What would be better. It's not super safe @@ -173,15 +172,15 @@ func (e *EventPublisher) sendEvents(update commitUpdate) { // handleACLUpdate handles an ACL token/policy/role update. This method assumes // the lock is held. -func (e *EventPublisher) handleACLUpdate(tx *txn, event agentpb.Event) error { +func (e *EventPublisher) handleACLUpdate(tx *txn, event stream.Event) error { switch event.Topic { - case agentpb.Topic_ACLTokens: + case stream.Topic_ACLTokens: token := event.GetACLToken() subs := e.subsByToken[secretHash(token.Token.SecretID)] for _, sub := range subs { sub.CloseReload() } - case agentpb.Topic_ACLPolicies: + case stream.Topic_ACLPolicies: policy := event.GetACLPolicy() // TODO(streaming) figure out how to thread method/ent meta here for // namespace support in Ent. Probably need wildcard here? @@ -224,7 +223,7 @@ func (e *EventPublisher) handleACLUpdate(tx *txn, event agentpb.Event) error { } } - case agentpb.Topic_ACLRoles: + case stream.Topic_ACLRoles: role := event.GetACLRole() // TODO(streaming) figure out how to thread method/ent meta here for // namespace support in Ent. @@ -264,8 +263,10 @@ func secretHash(token string) string { // // When the called is finished with the subscription for any reason, it must // call Unsubscribe to free ACL tracking resources. -func (e *EventPublisher) Subscribe(ctx context.Context, - req *agentpb.SubscribeRequest) (*stream.Subscription, error) { +func (e *EventPublisher) Subscribe( + ctx context.Context, + req *stream.SubscribeRequest, +) (*stream.Subscription, error) { // Ensure we know how to make a snapshot for this topic _, ok := topicRegistry[req.Topic] if !ok { @@ -291,11 +292,11 @@ func (e *EventPublisher) Subscribe(ctx context.Context, // client it's cache is still good. (note that this can be distinguished // from a legitimate empty snapshot due to the index matching the one the // client sent), then follow along from here in the topic. - e := agentpb.Event{ + e := stream.Event{ Index: req.Index, Topic: req.Topic, Key: req.Key, - Payload: &agentpb.Event_ResumeStream{ResumeStream: true}, + Payload: &stream.Event_ResumeStream{ResumeStream: true}, } // Make a new buffer to send to the client containing the resume. buf := stream.NewEventBuffer() @@ -304,7 +305,7 @@ func (e *EventPublisher) Subscribe(ctx context.Context, // starting point for the subscription. subHead := buf.Head() - buf.Append([]agentpb.Event{e}) + buf.Append([]stream.Event{e}) // Now splice the rest of the topic buffer on so the subscription will // continue to see future updates in the topic buffer. @@ -327,7 +328,7 @@ func (e *EventPublisher) Subscribe(ctx context.Context, tokenHash := secretHash(req.Token) subsByToken, ok := e.subsByToken[tokenHash] if !ok { - subsByToken = make(map[*agentpb.SubscribeRequest]*stream.Subscription) + subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription) e.subsByToken[tokenHash] = subsByToken } subsByToken[req] = sub @@ -338,7 +339,7 @@ func (e *EventPublisher) Subscribe(ctx context.Context, // Unsubscribe must be called when a client is no longer interested in a // subscription to free resources monitoring changes in it's ACL token. The same // request object passed to Subscribe must be used. -func (e *EventPublisher) Unsubscribe(req *agentpb.SubscribeRequest) { +func (e *EventPublisher) Unsubscribe(req *stream.SubscribeRequest) { e.lock.Lock() defer e.lock.Unlock() @@ -353,7 +354,7 @@ func (e *EventPublisher) Unsubscribe(req *agentpb.SubscribeRequest) { } } -func (e *EventPublisher) getSnapshotLocked(req *agentpb.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) { +func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) { // See if there is a cached snapshot topicSnaps, ok := e.snapCache[req.Topic] if !ok { diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index a4834da3e..06ef5e002 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -8,8 +8,8 @@ import ( // unboundSnapFn is a stream.SnapFn with state store as the first argument. This // is bound to a concrete state store instance in the EventPublisher on startup. -type unboundSnapFn func(*Store, *agentpb.SubscribeRequest, *stream.EventBuffer) (uint64, error) -type unboundProcessChangesFn func(*Store, *txnWrapper, memdb.Changes) ([]agentpb.Event, error) +type unboundSnapFn func(*Store, *stream.SubscribeRequest, *stream.EventBuffer) (uint64, error) +type unboundProcessChangesFn func(*Store, *txn, memdb.Changes) ([]stream.Event, error) // topicHandlers describes the methods needed to process a streaming // subscription for a given topic. @@ -20,7 +20,7 @@ type topicHandlers struct { // topicRegistry is a map of topic handlers. It must only be written to during // init(). -var topicRegistry map[agentpb.Topic]topicHandlers +var topicRegistry map[stream.Topic]topicHandlers func init() { topicRegistry = map[agentpb.Topic]topicHandlers{ diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go new file mode 100644 index 000000000..6fd83855b --- /dev/null +++ b/agent/consul/stream/event.go @@ -0,0 +1,26 @@ +package stream + +type Topic int32 + +// TODO: remove underscores +const ( + Topic_ServiceHealth Topic = 0 + Topic_ServiceHealthConnect Topic = 1 + Topic_ACLTokens Topic = 2 + Topic_ACLPolicies Topic = 3 + Topic_ACLRoles Topic = 4 +) + +// TODO: +type Event struct { + Topic Topic + Key string + Index uint64 + Payload interface{} +} + +func (e Event) isEndOfSnapshot() bool { + return e.Payload == endOfSnapshot{} +} + +type endOfSnapshot struct{} diff --git a/agent/consul/stream/event_buffer.go b/agent/consul/stream/event_buffer.go index aa6a2c89c..830b30234 100644 --- a/agent/consul/stream/event_buffer.go +++ b/agent/consul/stream/event_buffer.go @@ -4,21 +4,19 @@ import ( "context" "errors" "sync/atomic" - - "github.com/hashicorp/consul/agent/agentpb" ) // EventBuffer is a single-writer, multiple-reader, unlimited length concurrent // buffer of events that have been published on a topic. The buffer is // effectively just the head of an atomically updated single-linked list. Atomic // accesses are usually to be suspected as premature optimization but this -// specifc design has several important features that significantly simplify a +// specific design has several important features that significantly simplify a // lot of our PubSub machinery. // // The Buffer itself only ever tracks the most recent set of events published so // if there are no consumers older events are automatically garbage collected. // Notification of new events is done by closing a channel on the previous head -// alowing efficient broadcast to many watchers without having to run multile +// allowing efficient broadcast to many watchers without having to run multiple // goroutines or deliver to O(N) separate channels. // // Because it's a linked list with atomically updated pointers, readers don't @@ -35,7 +33,7 @@ import ( // the first event in the buffer, we can cache the buffered events for future // watchers on the same topic. Finally, once we've delivered all the snapshot // events to the buffer, we can append a next-element which is the first topic -// buffer element with a higher index and so consuers can just keep reading the +// buffer element with a higher index and so consumers can just keep reading the // same buffer. // // A huge benefit here is that caching snapshots becomes very simple - we don't @@ -78,7 +76,7 @@ func NewEventBuffer() *EventBuffer { // mutations to the events as they may have been exposed to subscribers in other // goroutines. Append only supports a single concurrent caller and must be // externally synchronized with other Append, AppendBuffer or AppendErr calls. -func (b *EventBuffer) Append(events []agentpb.Event) { +func (b *EventBuffer) Append(events []Event) { // Push events to the head it := NewBufferItem() it.Events = events @@ -146,7 +144,7 @@ type BufferItem struct { // should check and skip nil Events at any point in the buffer. It will also // be nil if the producer appends an Error event because they can't complete // the request to populate the buffer. Err will be non-nil in this case. - Events []agentpb.Event + Events []Event // Err is non-nil if the producer can't complete their task and terminates the // buffer. Subscribers should return the error to clients and cease attempting diff --git a/agent/consul/stream/event_buffer_test.go b/agent/consul/stream/event_buffer_test.go index 408ecdeab..eadd63816 100644 --- a/agent/consul/stream/event_buffer_test.go +++ b/agent/consul/stream/event_buffer_test.go @@ -8,13 +8,14 @@ import ( time "time" "github.com/stretchr/testify/assert" - - "github.com/hashicorp/consul/agent/agentpb" ) +// A property-based test to ensure that under heavy concurrent use trivial +// correctness properties are not violated (and that -race doesn't complain). func TestEventBufferFuzz(t *testing.T) { - // A property-based test to ensure that under heavy concurrent use trivial - // correctness properties are not violated (and that -race doesn't complain). + if testing.Short() { + t.Skip("too slow for short run") + } nReaders := 1000 nMessages := 1000 @@ -33,14 +34,11 @@ func TestEventBufferFuzz(t *testing.T) { for i := 0; i < nMessages; i++ { // Event content is arbitrary and not valid for our use of buffers in // streaming - here we only care about the semantics of the buffer. - e := agentpb.Event{ - Index: uint64(i), // Indexes should be contiguous - Topic: agentpb.Topic_ServiceHealth, - Payload: &agentpb.Event_EndOfSnapshot{ - EndOfSnapshot: true, - }, + e := Event{ + Index: uint64(i), // Indexes should be contiguous + Topic: Topic_ServiceHealth, } - b.Append([]agentpb.Event{e}) + b.Append([]Event{e}) // Sleep sometimes for a while to let some subscribers catch up wait := time.Duration(z.Uint64()) * time.Millisecond time.Sleep(wait) diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index bba68397f..520ac8e60 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -1,9 +1,5 @@ package stream -import ( - "github.com/hashicorp/consul/agent/agentpb" -) - // EventSnapshot represents the state of memdb for a given topic and key at some // point in time. It is modelled as a buffer of events so that snapshots can be // streamed to possibly multiple subscribers concurrently, and can be trivially @@ -13,7 +9,7 @@ import ( // by Go's runtime, simplifying snapshot and buffer management dramatically. type EventSnapshot struct { // Request that this snapshot satisfies. - Request *agentpb.SubscribeRequest + Request *SubscribeRequest // Snap is the first item in the buffer containing the snapshot. Once the // snapshot is complete, subsequent update's BufferItems are appended such @@ -35,7 +31,7 @@ type EventSnapshot struct { // SnapFn is the type of function needed to generate a snapshot for a topic and // key. -type SnapFn func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error) +type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) // NewEventSnapshot creates a snapshot buffer based on the subscription request. // The current buffer head for the topic in question is passed so that once the @@ -44,7 +40,7 @@ type SnapFn func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error // missed. Once the snapshot is delivered the topic buffer is spliced onto the // snapshot buffer so that subscribers will naturally follow from the snapshot // to wait for any subsequent updates. -func NewEventSnapshot(req *agentpb.SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot { +func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot { buf := NewEventBuffer() s := &EventSnapshot{ Request: req, @@ -68,7 +64,7 @@ func (s *EventSnapshot) doSnapshot() { } // We wrote the snapshot events to the buffer, send the "end of snapshot" event - s.snapBuffer.Append([]agentpb.Event{agentpb.Event{ + s.snapBuffer.Append([]Event{{ Topic: s.Request.Topic, Key: s.Request.Key, Index: idx, diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 685812fae..4d6f4e6df 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -6,14 +6,13 @@ import ( "testing" time "time" - "github.com/hashicorp/consul/agent/agentpb" "github.com/stretchr/testify/require" ) func TestEventSnapshot(t *testing.T) { // Setup a dummy state that we can manipulate easily. The properties we care // about are that we publish some sequence of events as a snapshot and then - // follow them up with "live updates". We control the interleavings. Our state + // follow them up with "live updates". We control the interleaving. Our state // consists of health events (only type fully defined so far) for service // instances with consecutive ID numbers starting from 0 (e.g. test-000, // test-001). The snapshot is delivered at index 1000. updatesBeforeSnap @@ -85,12 +84,12 @@ func TestEventSnapshot(t *testing.T) { // Use an instance index that's unique and should never appear in the // output so we can be sure these were not included as they came before // the snapshot. - tb.Append([]agentpb.Event{testHealthEvent(index, 10000+i)}) + tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)}) } // Create EventSnapshot, (will call snFn in another goroutine). The // Request is ignored by the SnapFn so doesn't matter for now. - es := NewEventSnapshot(&agentpb.SubscribeRequest{}, tbHead, snFn) + es := NewEventSnapshot(&SubscribeRequest{}, tbHead, snFn) // Deliver any post-snapshot events simulating updates that occur // logically after snapshot. It doesn't matter that these might actually @@ -102,7 +101,7 @@ func TestEventSnapshot(t *testing.T) { for i := 0; i < tc.updatesAfterSnap; i++ { index := snapIndex + 1 + uint64(i) // Use an instance index that's unique. - tb.Append([]agentpb.Event{testHealthEvent(index, 20000+i)}) + tb.Append([]Event{newDefaultHealthEvent(index, 20000+i)}) } // Now read the snapshot buffer until we've received everything we expect. @@ -123,20 +122,21 @@ func TestEventSnapshot(t *testing.T) { "current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone, snapIDs, updateIDs) e := curItem.Events[0] - if snapDone { - sh := e.GetServiceHealth() - require.NotNil(t, sh, "want health event got: %#v", e.Payload) - updateIDs = append(updateIDs, sh.CheckServiceNode.Service.ID) + switch { + case snapDone: + payload, ok := e.Payload.(string) + require.True(t, ok, "want health event got: %#v", e.Payload) + updateIDs = append(updateIDs, payload) if len(updateIDs) == tc.updatesAfterSnap { // We're done! break RECV } - } else if e.GetEndOfSnapshot() { + case e.isEndOfSnapshot(): snapDone = true - } else { - sh := e.GetServiceHealth() - require.NotNil(t, sh, "want health event got: %#v", e.Payload) - snapIDs = append(snapIDs, sh.CheckServiceNode.Service.ID) + default: + payload, ok := e.Payload.(string) + require.True(t, ok, "want health event got: %#v", e.Payload) + snapIDs = append(snapIDs, payload) } } @@ -150,42 +150,27 @@ func TestEventSnapshot(t *testing.T) { func genSequentialIDs(start, end int) []string { ids := make([]string, 0, end-start) for i := start; i < end; i++ { - ids = append(ids, fmt.Sprintf("test-%03d", i)) + ids = append(ids, fmt.Sprintf("test-event-%03d", i)) } return ids } func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn { - return func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error) { + return func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) { for i := 0; i < size; i++ { // Event content is arbitrary we are just using Health because it's the // first type defined. We just want a set of things with consecutive // names. - buf.Append([]agentpb.Event{testHealthEvent(index, i)}) + buf.Append([]Event{newDefaultHealthEvent(index, i)}) } return index, nil } } -func testHealthEvent(index uint64, n int) agentpb.Event { - return agentpb.Event{ - Index: index, - Topic: agentpb.Topic_ServiceHealth, - Payload: &agentpb.Event_ServiceHealth{ - ServiceHealth: &agentpb.ServiceHealthUpdate{ - Op: agentpb.CatalogOp_Register, - CheckServiceNode: &agentpb.CheckServiceNode{ - Node: &agentpb.Node{ - Node: "n1", - Address: "10.10.10.10", - }, - Service: &agentpb.NodeService{ - ID: fmt.Sprintf("test-%03d", n), - Service: "test", - Port: 8080, - }, - }, - }, - }, +func newDefaultHealthEvent(index uint64, n int) Event { + return Event{ + Index: index, + Topic: Topic_ServiceHealth, + Payload: fmt.Sprintf("test-event-%03d", n), } } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 42c396a13..5ca8eb75e 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -4,8 +4,6 @@ import ( context "context" "errors" "sync/atomic" - - "github.com/hashicorp/consul/agent/agentpb" ) const ( @@ -31,7 +29,7 @@ type Subscription struct { state uint32 // req is the requests that we are responding to - req *agentpb.SubscribeRequest + req *SubscribeRequest // currentItem stores the current snapshot or topic buffer item we are on. It // is mutated by calls to Next. @@ -46,8 +44,15 @@ type Subscription struct { cancelFn func() } +type SubscribeRequest struct { + Topic Topic + Key string + Token string + Index uint64 +} + // NewSubscription return a new subscription. -func NewSubscription(ctx context.Context, req *agentpb.SubscribeRequest, item *BufferItem) *Subscription { +func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription { subCtx, cancel := context.WithCancel(ctx) return &Subscription{ ctx: subCtx, @@ -59,7 +64,7 @@ func NewSubscription(ctx context.Context, req *agentpb.SubscribeRequest, item *B // Next returns the next set of events to deliver. It must only be called from a // single goroutine concurrently as it mutates the Subscription. -func (s *Subscription) Next() ([]agentpb.Event, error) { +func (s *Subscription) Next() ([]Event, error) { state := atomic.LoadUint32(&s.state) if state == SubscriptionStateCloseReload { return nil, ErrSubscriptionReload @@ -95,7 +100,7 @@ func (s *Subscription) Next() ([]agentpb.Event, error) { // as this is a hot loop. events := next.Events if !allMatch { - events = make([]agentpb.Event, 0, len(next.Events)) + events = make([]Event, 0, len(next.Events)) for _, e := range next.Events { // Only return it if the key matches. if s.req.Key == "" || s.req.Key == e.Key { @@ -123,6 +128,6 @@ func (s *Subscription) CloseReload() { } // Request returns the request object that started the subscription. -func (s *Subscription) Request() *agentpb.SubscribeRequest { +func (s *Subscription) Request() *SubscribeRequest { return s.req } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 437d3872c..56de6958f 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -5,7 +5,6 @@ import ( "testing" time "time" - "github.com/hashicorp/consul/agent/agentpb" "github.com/stretchr/testify/require" ) @@ -17,14 +16,14 @@ func TestSubscription(t *testing.T) { startHead := eb.Head() // Start with an event in the buffer - testPublish(index, eb, "test") + publishTestEvent(index, eb, "test") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Create a subscription - req := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + req := &SubscribeRequest{ + Topic: Topic_ServiceHealth, Key: "test", } sub := NewSubscription(ctx, req, startHead) @@ -43,7 +42,7 @@ func TestSubscription(t *testing.T) { index++ start = time.Now() time.AfterFunc(200*time.Millisecond, func() { - testPublish(index, eb, "test") + publishTestEvent(index, eb, "test") }) // Next call should block until event is delivered @@ -60,9 +59,9 @@ func TestSubscription(t *testing.T) { // Event with wrong key should not be delivered. Deliver a good message right // so we don't have to block test thread forever or cancel func yet. index++ - testPublish(index, eb, "nope") + publishTestEvent(index, eb, "nope") index++ - testPublish(index, eb, "test") + publishTestEvent(index, eb, "test") start = time.Now() got, err = sub.Next() @@ -97,14 +96,14 @@ func TestSubscriptionCloseReload(t *testing.T) { startHead := eb.Head() // Start with an event in the buffer - testPublish(index, eb, "test") + publishTestEvent(index, eb, "test") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Create a subscription - req := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + req := &SubscribeRequest{ + Topic: Topic_ServiceHealth, Key: "test", } sub := NewSubscription(ctx, req, startHead) @@ -136,17 +135,14 @@ func TestSubscriptionCloseReload(t *testing.T) { "Reload should have been delivered after short time, took %s", elapsed) } -func testPublish(index uint64, b *EventBuffer, key string) { - // Don't care about the event payload for now just the semantics of publising +func publishTestEvent(index uint64, b *EventBuffer, key string) { + // Don't care about the event payload for now just the semantics of publishing // something. This is not a valid stream in the end-to-end streaming protocol // but enough to test subscription mechanics. - e := agentpb.Event{ + e := Event{ Index: index, - Topic: agentpb.Topic_ServiceHealth, + Topic: Topic_ServiceHealth, Key: key, - Payload: &agentpb.Event_EndOfSnapshot{ - EndOfSnapshot: true, - }, } - b.Append([]agentpb.Event{e}) + b.Append([]Event{e}) }