stream: Use local types for Event Topic SubscriptionRequest

This commit is contained in:
Daniel Nephin 2020-06-05 19:36:31 -04:00
parent 3d62013062
commit 2e45bbbb3e
9 changed files with 125 additions and 120 deletions

View File

@ -9,7 +9,6 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"golang.org/x/crypto/blake2b" "golang.org/x/crypto/blake2b"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -38,20 +37,20 @@ type EventPublisher struct {
// topicBuffers stores the head of the linked-list buffer to publish events to // topicBuffers stores the head of the linked-list buffer to publish events to
// for a topic. // for a topic.
topicBuffers map[agentpb.Topic]*stream.EventBuffer topicBuffers map[stream.Topic]*stream.EventBuffer
// snapCache stores the head of any snapshot buffers still in cache if caching // snapCache stores the head of any snapshot buffers still in cache if caching
// is enabled. // is enabled.
snapCache map[agentpb.Topic]map[string]*stream.EventSnapshot snapCache map[stream.Topic]map[string]*stream.EventSnapshot
// snapFns is the set of snapshot functions that were registered bound to the // snapFns is the set of snapshot functions that were registered bound to the
// state store. // state store.
snapFns map[agentpb.Topic]stream.SnapFn snapFns map[stream.Topic]stream.SnapFn
// subsByToken stores a list of Subscription objects outstanding indexed by a // subsByToken stores a list of Subscription objects outstanding indexed by a
// hash of the ACL token they used to subscribe so we can reload them if their // hash of the ACL token they used to subscribe so we can reload them if their
// ACL permissions change. // ACL permissions change.
subsByToken map[string]map[*agentpb.SubscribeRequest]*stream.Subscription subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
// commitCh decouples the Commit call in the FSM hot path from distributing // commitCh decouples the Commit call in the FSM hot path from distributing
// the resulting events. // the resulting events.
@ -59,8 +58,8 @@ type EventPublisher struct {
} }
type commitUpdate struct { type commitUpdate struct {
tx *txnWrapper tx *txn
events []agentpb.Event events []stream.Event
} }
func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Duration) *EventPublisher { func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Duration) *EventPublisher {
@ -68,10 +67,10 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura
store: store, store: store,
topicBufferSize: topicBufferSize, topicBufferSize: topicBufferSize,
snapCacheTTL: snapCacheTTL, snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[agentpb.Topic]*stream.EventBuffer), topicBuffers: make(map[stream.Topic]*stream.EventBuffer),
snapCache: make(map[agentpb.Topic]map[string]*stream.EventSnapshot), snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
snapFns: make(map[agentpb.Topic]stream.SnapFn), snapFns: make(map[stream.Topic]stream.SnapFn),
subsByToken: make(map[string]map[*agentpb.SubscribeRequest]*stream.Subscription), subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
commitCh: make(chan commitUpdate, 64), commitCh: make(chan commitUpdate, 64),
} }
@ -79,7 +78,7 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura
// TODO: document why // TODO: document why
for topic, handlers := range topicRegistry { for topic, handlers := range topicRegistry {
fnCopy := handlers.Snapshot fnCopy := handlers.Snapshot
e.snapFns[topic] = func(req *agentpb.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) { e.snapFns[topic] = func(req *stream.SubscribeRequest, buf *stream.EventBuffer) (uint64, error) {
return fnCopy(e.store, req, buf) return fnCopy(e.store, req, buf)
} }
} }
@ -90,7 +89,7 @@ func NewEventPublisher(store *Store, topicBufferSize int, snapCacheTTL time.Dura
} }
func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error { func (e *EventPublisher) publishChanges(tx *txn, changes memdb.Changes) error {
var events []agentpb.Event var events []stream.Event
for topic, th := range topicRegistry { for topic, th := range topicRegistry {
if th.ProcessChanges != nil { if th.ProcessChanges != nil {
es, err := th.ProcessChanges(e.store, tx, changes) es, err := th.ProcessChanges(e.store, tx, changes)
@ -132,15 +131,15 @@ func (e *EventPublisher) sendEvents(update commitUpdate) {
// implementation. // implementation.
defer update.tx.Abort() defer update.tx.Abort()
eventsByTopic := make(map[agentpb.Topic][]agentpb.Event) eventsByTopic := make(map[stream.Topic][]stream.Event)
for _, event := range update.events { for _, event := range update.events {
// If the event is an ACL update, treat it as a special case. Currently // If the event is an ACL update, treat it as a special case. Currently
// ACL update events are only used internally to recognize when a subscriber // ACL update events are only used internally to recognize when a subscriber
// should reload its subscription. // should reload its subscription.
if event.Topic == agentpb.Topic_ACLTokens || if event.Topic == stream.Topic_ACLTokens ||
event.Topic == agentpb.Topic_ACLPolicies || event.Topic == stream.Topic_ACLPolicies ||
event.Topic == agentpb.Topic_ACLRoles { event.Topic == stream.Topic_ACLRoles {
if err := e.handleACLUpdate(update.tx, event); err != nil { if err := e.handleACLUpdate(update.tx, event); err != nil {
// This seems pretty drastic? What would be better. It's not super safe // This seems pretty drastic? What would be better. It's not super safe
@ -173,15 +172,15 @@ func (e *EventPublisher) sendEvents(update commitUpdate) {
// handleACLUpdate handles an ACL token/policy/role update. This method assumes // handleACLUpdate handles an ACL token/policy/role update. This method assumes
// the lock is held. // the lock is held.
func (e *EventPublisher) handleACLUpdate(tx *txn, event agentpb.Event) error { func (e *EventPublisher) handleACLUpdate(tx *txn, event stream.Event) error {
switch event.Topic { switch event.Topic {
case agentpb.Topic_ACLTokens: case stream.Topic_ACLTokens:
token := event.GetACLToken() token := event.GetACLToken()
subs := e.subsByToken[secretHash(token.Token.SecretID)] subs := e.subsByToken[secretHash(token.Token.SecretID)]
for _, sub := range subs { for _, sub := range subs {
sub.CloseReload() sub.CloseReload()
} }
case agentpb.Topic_ACLPolicies: case stream.Topic_ACLPolicies:
policy := event.GetACLPolicy() policy := event.GetACLPolicy()
// TODO(streaming) figure out how to thread method/ent meta here for // TODO(streaming) figure out how to thread method/ent meta here for
// namespace support in Ent. Probably need wildcard here? // namespace support in Ent. Probably need wildcard here?
@ -224,7 +223,7 @@ func (e *EventPublisher) handleACLUpdate(tx *txn, event agentpb.Event) error {
} }
} }
case agentpb.Topic_ACLRoles: case stream.Topic_ACLRoles:
role := event.GetACLRole() role := event.GetACLRole()
// TODO(streaming) figure out how to thread method/ent meta here for // TODO(streaming) figure out how to thread method/ent meta here for
// namespace support in Ent. // namespace support in Ent.
@ -264,8 +263,10 @@ func secretHash(token string) string {
// //
// When the called is finished with the subscription for any reason, it must // When the called is finished with the subscription for any reason, it must
// call Unsubscribe to free ACL tracking resources. // call Unsubscribe to free ACL tracking resources.
func (e *EventPublisher) Subscribe(ctx context.Context, func (e *EventPublisher) Subscribe(
req *agentpb.SubscribeRequest) (*stream.Subscription, error) { ctx context.Context,
req *stream.SubscribeRequest,
) (*stream.Subscription, error) {
// Ensure we know how to make a snapshot for this topic // Ensure we know how to make a snapshot for this topic
_, ok := topicRegistry[req.Topic] _, ok := topicRegistry[req.Topic]
if !ok { if !ok {
@ -291,11 +292,11 @@ func (e *EventPublisher) Subscribe(ctx context.Context,
// client it's cache is still good. (note that this can be distinguished // client it's cache is still good. (note that this can be distinguished
// from a legitimate empty snapshot due to the index matching the one the // from a legitimate empty snapshot due to the index matching the one the
// client sent), then follow along from here in the topic. // client sent), then follow along from here in the topic.
e := agentpb.Event{ e := stream.Event{
Index: req.Index, Index: req.Index,
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Key: req.Key,
Payload: &agentpb.Event_ResumeStream{ResumeStream: true}, Payload: &stream.Event_ResumeStream{ResumeStream: true},
} }
// Make a new buffer to send to the client containing the resume. // Make a new buffer to send to the client containing the resume.
buf := stream.NewEventBuffer() buf := stream.NewEventBuffer()
@ -304,7 +305,7 @@ func (e *EventPublisher) Subscribe(ctx context.Context,
// starting point for the subscription. // starting point for the subscription.
subHead := buf.Head() subHead := buf.Head()
buf.Append([]agentpb.Event{e}) buf.Append([]stream.Event{e})
// Now splice the rest of the topic buffer on so the subscription will // Now splice the rest of the topic buffer on so the subscription will
// continue to see future updates in the topic buffer. // continue to see future updates in the topic buffer.
@ -327,7 +328,7 @@ func (e *EventPublisher) Subscribe(ctx context.Context,
tokenHash := secretHash(req.Token) tokenHash := secretHash(req.Token)
subsByToken, ok := e.subsByToken[tokenHash] subsByToken, ok := e.subsByToken[tokenHash]
if !ok { if !ok {
subsByToken = make(map[*agentpb.SubscribeRequest]*stream.Subscription) subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription)
e.subsByToken[tokenHash] = subsByToken e.subsByToken[tokenHash] = subsByToken
} }
subsByToken[req] = sub subsByToken[req] = sub
@ -338,7 +339,7 @@ func (e *EventPublisher) Subscribe(ctx context.Context,
// Unsubscribe must be called when a client is no longer interested in a // Unsubscribe must be called when a client is no longer interested in a
// subscription to free resources monitoring changes in it's ACL token. The same // subscription to free resources monitoring changes in it's ACL token. The same
// request object passed to Subscribe must be used. // request object passed to Subscribe must be used.
func (e *EventPublisher) Unsubscribe(req *agentpb.SubscribeRequest) { func (e *EventPublisher) Unsubscribe(req *stream.SubscribeRequest) {
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
@ -353,7 +354,7 @@ func (e *EventPublisher) Unsubscribe(req *agentpb.SubscribeRequest) {
} }
} }
func (e *EventPublisher) getSnapshotLocked(req *agentpb.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) { func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHead *stream.BufferItem) (*stream.EventSnapshot, error) {
// See if there is a cached snapshot // See if there is a cached snapshot
topicSnaps, ok := e.snapCache[req.Topic] topicSnaps, ok := e.snapCache[req.Topic]
if !ok { if !ok {

View File

@ -8,8 +8,8 @@ import (
// unboundSnapFn is a stream.SnapFn with state store as the first argument. This // unboundSnapFn is a stream.SnapFn with state store as the first argument. This
// is bound to a concrete state store instance in the EventPublisher on startup. // is bound to a concrete state store instance in the EventPublisher on startup.
type unboundSnapFn func(*Store, *agentpb.SubscribeRequest, *stream.EventBuffer) (uint64, error) type unboundSnapFn func(*Store, *stream.SubscribeRequest, *stream.EventBuffer) (uint64, error)
type unboundProcessChangesFn func(*Store, *txnWrapper, memdb.Changes) ([]agentpb.Event, error) type unboundProcessChangesFn func(*Store, *txn, memdb.Changes) ([]stream.Event, error)
// topicHandlers describes the methods needed to process a streaming // topicHandlers describes the methods needed to process a streaming
// subscription for a given topic. // subscription for a given topic.
@ -20,7 +20,7 @@ type topicHandlers struct {
// topicRegistry is a map of topic handlers. It must only be written to during // topicRegistry is a map of topic handlers. It must only be written to during
// init(). // init().
var topicRegistry map[agentpb.Topic]topicHandlers var topicRegistry map[stream.Topic]topicHandlers
func init() { func init() {
topicRegistry = map[agentpb.Topic]topicHandlers{ topicRegistry = map[agentpb.Topic]topicHandlers{

View File

@ -0,0 +1,26 @@
package stream
type Topic int32
// TODO: remove underscores
const (
Topic_ServiceHealth Topic = 0
Topic_ServiceHealthConnect Topic = 1
Topic_ACLTokens Topic = 2
Topic_ACLPolicies Topic = 3
Topic_ACLRoles Topic = 4
)
// TODO:
type Event struct {
Topic Topic
Key string
Index uint64
Payload interface{}
}
func (e Event) isEndOfSnapshot() bool {
return e.Payload == endOfSnapshot{}
}
type endOfSnapshot struct{}

View File

@ -4,21 +4,19 @@ import (
"context" "context"
"errors" "errors"
"sync/atomic" "sync/atomic"
"github.com/hashicorp/consul/agent/agentpb"
) )
// EventBuffer is a single-writer, multiple-reader, unlimited length concurrent // EventBuffer is a single-writer, multiple-reader, unlimited length concurrent
// buffer of events that have been published on a topic. The buffer is // buffer of events that have been published on a topic. The buffer is
// effectively just the head of an atomically updated single-linked list. Atomic // effectively just the head of an atomically updated single-linked list. Atomic
// accesses are usually to be suspected as premature optimization but this // accesses are usually to be suspected as premature optimization but this
// specifc design has several important features that significantly simplify a // specific design has several important features that significantly simplify a
// lot of our PubSub machinery. // lot of our PubSub machinery.
// //
// The Buffer itself only ever tracks the most recent set of events published so // The Buffer itself only ever tracks the most recent set of events published so
// if there are no consumers older events are automatically garbage collected. // if there are no consumers older events are automatically garbage collected.
// Notification of new events is done by closing a channel on the previous head // Notification of new events is done by closing a channel on the previous head
// alowing efficient broadcast to many watchers without having to run multile // allowing efficient broadcast to many watchers without having to run multiple
// goroutines or deliver to O(N) separate channels. // goroutines or deliver to O(N) separate channels.
// //
// Because it's a linked list with atomically updated pointers, readers don't // Because it's a linked list with atomically updated pointers, readers don't
@ -35,7 +33,7 @@ import (
// the first event in the buffer, we can cache the buffered events for future // the first event in the buffer, we can cache the buffered events for future
// watchers on the same topic. Finally, once we've delivered all the snapshot // watchers on the same topic. Finally, once we've delivered all the snapshot
// events to the buffer, we can append a next-element which is the first topic // events to the buffer, we can append a next-element which is the first topic
// buffer element with a higher index and so consuers can just keep reading the // buffer element with a higher index and so consumers can just keep reading the
// same buffer. // same buffer.
// //
// A huge benefit here is that caching snapshots becomes very simple - we don't // A huge benefit here is that caching snapshots becomes very simple - we don't
@ -78,7 +76,7 @@ func NewEventBuffer() *EventBuffer {
// mutations to the events as they may have been exposed to subscribers in other // mutations to the events as they may have been exposed to subscribers in other
// goroutines. Append only supports a single concurrent caller and must be // goroutines. Append only supports a single concurrent caller and must be
// externally synchronized with other Append, AppendBuffer or AppendErr calls. // externally synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *EventBuffer) Append(events []agentpb.Event) { func (b *EventBuffer) Append(events []Event) {
// Push events to the head // Push events to the head
it := NewBufferItem() it := NewBufferItem()
it.Events = events it.Events = events
@ -146,7 +144,7 @@ type BufferItem struct {
// should check and skip nil Events at any point in the buffer. It will also // should check and skip nil Events at any point in the buffer. It will also
// be nil if the producer appends an Error event because they can't complete // be nil if the producer appends an Error event because they can't complete
// the request to populate the buffer. Err will be non-nil in this case. // the request to populate the buffer. Err will be non-nil in this case.
Events []agentpb.Event Events []Event
// Err is non-nil if the producer can't complete their task and terminates the // Err is non-nil if the producer can't complete their task and terminates the
// buffer. Subscribers should return the error to clients and cease attempting // buffer. Subscribers should return the error to clients and cease attempting

View File

@ -8,13 +8,14 @@ import (
time "time" time "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent/agentpb"
) )
// A property-based test to ensure that under heavy concurrent use trivial
// correctness properties are not violated (and that -race doesn't complain).
func TestEventBufferFuzz(t *testing.T) { func TestEventBufferFuzz(t *testing.T) {
// A property-based test to ensure that under heavy concurrent use trivial if testing.Short() {
// correctness properties are not violated (and that -race doesn't complain). t.Skip("too slow for short run")
}
nReaders := 1000 nReaders := 1000
nMessages := 1000 nMessages := 1000
@ -33,14 +34,11 @@ func TestEventBufferFuzz(t *testing.T) {
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
// Event content is arbitrary and not valid for our use of buffers in // Event content is arbitrary and not valid for our use of buffers in
// streaming - here we only care about the semantics of the buffer. // streaming - here we only care about the semantics of the buffer.
e := agentpb.Event{ e := Event{
Index: uint64(i), // Indexes should be contiguous Index: uint64(i), // Indexes should be contiguous
Topic: agentpb.Topic_ServiceHealth, Topic: Topic_ServiceHealth,
Payload: &agentpb.Event_EndOfSnapshot{
EndOfSnapshot: true,
},
} }
b.Append([]agentpb.Event{e}) b.Append([]Event{e})
// Sleep sometimes for a while to let some subscribers catch up // Sleep sometimes for a while to let some subscribers catch up
wait := time.Duration(z.Uint64()) * time.Millisecond wait := time.Duration(z.Uint64()) * time.Millisecond
time.Sleep(wait) time.Sleep(wait)

View File

@ -1,9 +1,5 @@
package stream package stream
import (
"github.com/hashicorp/consul/agent/agentpb"
)
// EventSnapshot represents the state of memdb for a given topic and key at some // EventSnapshot represents the state of memdb for a given topic and key at some
// point in time. It is modelled as a buffer of events so that snapshots can be // point in time. It is modelled as a buffer of events so that snapshots can be
// streamed to possibly multiple subscribers concurrently, and can be trivially // streamed to possibly multiple subscribers concurrently, and can be trivially
@ -13,7 +9,7 @@ import (
// by Go's runtime, simplifying snapshot and buffer management dramatically. // by Go's runtime, simplifying snapshot and buffer management dramatically.
type EventSnapshot struct { type EventSnapshot struct {
// Request that this snapshot satisfies. // Request that this snapshot satisfies.
Request *agentpb.SubscribeRequest Request *SubscribeRequest
// Snap is the first item in the buffer containing the snapshot. Once the // Snap is the first item in the buffer containing the snapshot. Once the
// snapshot is complete, subsequent update's BufferItems are appended such // snapshot is complete, subsequent update's BufferItems are appended such
@ -35,7 +31,7 @@ type EventSnapshot struct {
// SnapFn is the type of function needed to generate a snapshot for a topic and // SnapFn is the type of function needed to generate a snapshot for a topic and
// key. // key.
type SnapFn func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error) type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error)
// NewEventSnapshot creates a snapshot buffer based on the subscription request. // NewEventSnapshot creates a snapshot buffer based on the subscription request.
// The current buffer head for the topic in question is passed so that once the // The current buffer head for the topic in question is passed so that once the
@ -44,7 +40,7 @@ type SnapFn func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error
// missed. Once the snapshot is delivered the topic buffer is spliced onto the // missed. Once the snapshot is delivered the topic buffer is spliced onto the
// snapshot buffer so that subscribers will naturally follow from the snapshot // snapshot buffer so that subscribers will naturally follow from the snapshot
// to wait for any subsequent updates. // to wait for any subsequent updates.
func NewEventSnapshot(req *agentpb.SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot { func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot {
buf := NewEventBuffer() buf := NewEventBuffer()
s := &EventSnapshot{ s := &EventSnapshot{
Request: req, Request: req,
@ -68,7 +64,7 @@ func (s *EventSnapshot) doSnapshot() {
} }
// We wrote the snapshot events to the buffer, send the "end of snapshot" event // We wrote the snapshot events to the buffer, send the "end of snapshot" event
s.snapBuffer.Append([]agentpb.Event{agentpb.Event{ s.snapBuffer.Append([]Event{{
Topic: s.Request.Topic, Topic: s.Request.Topic,
Key: s.Request.Key, Key: s.Request.Key,
Index: idx, Index: idx,

View File

@ -6,14 +6,13 @@ import (
"testing" "testing"
time "time" time "time"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestEventSnapshot(t *testing.T) { func TestEventSnapshot(t *testing.T) {
// Setup a dummy state that we can manipulate easily. The properties we care // Setup a dummy state that we can manipulate easily. The properties we care
// about are that we publish some sequence of events as a snapshot and then // about are that we publish some sequence of events as a snapshot and then
// follow them up with "live updates". We control the interleavings. Our state // follow them up with "live updates". We control the interleaving. Our state
// consists of health events (only type fully defined so far) for service // consists of health events (only type fully defined so far) for service
// instances with consecutive ID numbers starting from 0 (e.g. test-000, // instances with consecutive ID numbers starting from 0 (e.g. test-000,
// test-001). The snapshot is delivered at index 1000. updatesBeforeSnap // test-001). The snapshot is delivered at index 1000. updatesBeforeSnap
@ -85,12 +84,12 @@ func TestEventSnapshot(t *testing.T) {
// Use an instance index that's unique and should never appear in the // Use an instance index that's unique and should never appear in the
// output so we can be sure these were not included as they came before // output so we can be sure these were not included as they came before
// the snapshot. // the snapshot.
tb.Append([]agentpb.Event{testHealthEvent(index, 10000+i)}) tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
} }
// Create EventSnapshot, (will call snFn in another goroutine). The // Create EventSnapshot, (will call snFn in another goroutine). The
// Request is ignored by the SnapFn so doesn't matter for now. // Request is ignored by the SnapFn so doesn't matter for now.
es := NewEventSnapshot(&agentpb.SubscribeRequest{}, tbHead, snFn) es := NewEventSnapshot(&SubscribeRequest{}, tbHead, snFn)
// Deliver any post-snapshot events simulating updates that occur // Deliver any post-snapshot events simulating updates that occur
// logically after snapshot. It doesn't matter that these might actually // logically after snapshot. It doesn't matter that these might actually
@ -102,7 +101,7 @@ func TestEventSnapshot(t *testing.T) {
for i := 0; i < tc.updatesAfterSnap; i++ { for i := 0; i < tc.updatesAfterSnap; i++ {
index := snapIndex + 1 + uint64(i) index := snapIndex + 1 + uint64(i)
// Use an instance index that's unique. // Use an instance index that's unique.
tb.Append([]agentpb.Event{testHealthEvent(index, 20000+i)}) tb.Append([]Event{newDefaultHealthEvent(index, 20000+i)})
} }
// Now read the snapshot buffer until we've received everything we expect. // Now read the snapshot buffer until we've received everything we expect.
@ -123,20 +122,21 @@ func TestEventSnapshot(t *testing.T) {
"current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone, "current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone,
snapIDs, updateIDs) snapIDs, updateIDs)
e := curItem.Events[0] e := curItem.Events[0]
if snapDone { switch {
sh := e.GetServiceHealth() case snapDone:
require.NotNil(t, sh, "want health event got: %#v", e.Payload) payload, ok := e.Payload.(string)
updateIDs = append(updateIDs, sh.CheckServiceNode.Service.ID) require.True(t, ok, "want health event got: %#v", e.Payload)
updateIDs = append(updateIDs, payload)
if len(updateIDs) == tc.updatesAfterSnap { if len(updateIDs) == tc.updatesAfterSnap {
// We're done! // We're done!
break RECV break RECV
} }
} else if e.GetEndOfSnapshot() { case e.isEndOfSnapshot():
snapDone = true snapDone = true
} else { default:
sh := e.GetServiceHealth() payload, ok := e.Payload.(string)
require.NotNil(t, sh, "want health event got: %#v", e.Payload) require.True(t, ok, "want health event got: %#v", e.Payload)
snapIDs = append(snapIDs, sh.CheckServiceNode.Service.ID) snapIDs = append(snapIDs, payload)
} }
} }
@ -150,42 +150,27 @@ func TestEventSnapshot(t *testing.T) {
func genSequentialIDs(start, end int) []string { func genSequentialIDs(start, end int) []string {
ids := make([]string, 0, end-start) ids := make([]string, 0, end-start)
for i := start; i < end; i++ { for i := start; i < end; i++ {
ids = append(ids, fmt.Sprintf("test-%03d", i)) ids = append(ids, fmt.Sprintf("test-event-%03d", i))
} }
return ids return ids
} }
func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn { func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn {
return func(req *agentpb.SubscribeRequest, buf *EventBuffer) (uint64, error) { return func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) {
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
// Event content is arbitrary we are just using Health because it's the // Event content is arbitrary we are just using Health because it's the
// first type defined. We just want a set of things with consecutive // first type defined. We just want a set of things with consecutive
// names. // names.
buf.Append([]agentpb.Event{testHealthEvent(index, i)}) buf.Append([]Event{newDefaultHealthEvent(index, i)})
} }
return index, nil return index, nil
} }
} }
func testHealthEvent(index uint64, n int) agentpb.Event { func newDefaultHealthEvent(index uint64, n int) Event {
return agentpb.Event{ return Event{
Index: index, Index: index,
Topic: agentpb.Topic_ServiceHealth, Topic: Topic_ServiceHealth,
Payload: &agentpb.Event_ServiceHealth{ Payload: fmt.Sprintf("test-event-%03d", n),
ServiceHealth: &agentpb.ServiceHealthUpdate{
Op: agentpb.CatalogOp_Register,
CheckServiceNode: &agentpb.CheckServiceNode{
Node: &agentpb.Node{
Node: "n1",
Address: "10.10.10.10",
},
Service: &agentpb.NodeService{
ID: fmt.Sprintf("test-%03d", n),
Service: "test",
Port: 8080,
},
},
},
},
} }
} }

View File

@ -4,8 +4,6 @@ import (
context "context" context "context"
"errors" "errors"
"sync/atomic" "sync/atomic"
"github.com/hashicorp/consul/agent/agentpb"
) )
const ( const (
@ -31,7 +29,7 @@ type Subscription struct {
state uint32 state uint32
// req is the requests that we are responding to // req is the requests that we are responding to
req *agentpb.SubscribeRequest req *SubscribeRequest
// currentItem stores the current snapshot or topic buffer item we are on. It // currentItem stores the current snapshot or topic buffer item we are on. It
// is mutated by calls to Next. // is mutated by calls to Next.
@ -46,8 +44,15 @@ type Subscription struct {
cancelFn func() cancelFn func()
} }
type SubscribeRequest struct {
Topic Topic
Key string
Token string
Index uint64
}
// NewSubscription return a new subscription. // NewSubscription return a new subscription.
func NewSubscription(ctx context.Context, req *agentpb.SubscribeRequest, item *BufferItem) *Subscription { func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription {
subCtx, cancel := context.WithCancel(ctx) subCtx, cancel := context.WithCancel(ctx)
return &Subscription{ return &Subscription{
ctx: subCtx, ctx: subCtx,
@ -59,7 +64,7 @@ func NewSubscription(ctx context.Context, req *agentpb.SubscribeRequest, item *B
// Next returns the next set of events to deliver. It must only be called from a // Next returns the next set of events to deliver. It must only be called from a
// single goroutine concurrently as it mutates the Subscription. // single goroutine concurrently as it mutates the Subscription.
func (s *Subscription) Next() ([]agentpb.Event, error) { func (s *Subscription) Next() ([]Event, error) {
state := atomic.LoadUint32(&s.state) state := atomic.LoadUint32(&s.state)
if state == SubscriptionStateCloseReload { if state == SubscriptionStateCloseReload {
return nil, ErrSubscriptionReload return nil, ErrSubscriptionReload
@ -95,7 +100,7 @@ func (s *Subscription) Next() ([]agentpb.Event, error) {
// as this is a hot loop. // as this is a hot loop.
events := next.Events events := next.Events
if !allMatch { if !allMatch {
events = make([]agentpb.Event, 0, len(next.Events)) events = make([]Event, 0, len(next.Events))
for _, e := range next.Events { for _, e := range next.Events {
// Only return it if the key matches. // Only return it if the key matches.
if s.req.Key == "" || s.req.Key == e.Key { if s.req.Key == "" || s.req.Key == e.Key {
@ -123,6 +128,6 @@ func (s *Subscription) CloseReload() {
} }
// Request returns the request object that started the subscription. // Request returns the request object that started the subscription.
func (s *Subscription) Request() *agentpb.SubscribeRequest { func (s *Subscription) Request() *SubscribeRequest {
return s.req return s.req
} }

View File

@ -5,7 +5,6 @@ import (
"testing" "testing"
time "time" time "time"
"github.com/hashicorp/consul/agent/agentpb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -17,14 +16,14 @@ func TestSubscription(t *testing.T) {
startHead := eb.Head() startHead := eb.Head()
// Start with an event in the buffer // Start with an event in the buffer
testPublish(index, eb, "test") publishTestEvent(index, eb, "test")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// Create a subscription // Create a subscription
req := &agentpb.SubscribeRequest{ req := &SubscribeRequest{
Topic: agentpb.Topic_ServiceHealth, Topic: Topic_ServiceHealth,
Key: "test", Key: "test",
} }
sub := NewSubscription(ctx, req, startHead) sub := NewSubscription(ctx, req, startHead)
@ -43,7 +42,7 @@ func TestSubscription(t *testing.T) {
index++ index++
start = time.Now() start = time.Now()
time.AfterFunc(200*time.Millisecond, func() { time.AfterFunc(200*time.Millisecond, func() {
testPublish(index, eb, "test") publishTestEvent(index, eb, "test")
}) })
// Next call should block until event is delivered // Next call should block until event is delivered
@ -60,9 +59,9 @@ func TestSubscription(t *testing.T) {
// Event with wrong key should not be delivered. Deliver a good message right // Event with wrong key should not be delivered. Deliver a good message right
// so we don't have to block test thread forever or cancel func yet. // so we don't have to block test thread forever or cancel func yet.
index++ index++
testPublish(index, eb, "nope") publishTestEvent(index, eb, "nope")
index++ index++
testPublish(index, eb, "test") publishTestEvent(index, eb, "test")
start = time.Now() start = time.Now()
got, err = sub.Next() got, err = sub.Next()
@ -97,14 +96,14 @@ func TestSubscriptionCloseReload(t *testing.T) {
startHead := eb.Head() startHead := eb.Head()
// Start with an event in the buffer // Start with an event in the buffer
testPublish(index, eb, "test") publishTestEvent(index, eb, "test")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
// Create a subscription // Create a subscription
req := &agentpb.SubscribeRequest{ req := &SubscribeRequest{
Topic: agentpb.Topic_ServiceHealth, Topic: Topic_ServiceHealth,
Key: "test", Key: "test",
} }
sub := NewSubscription(ctx, req, startHead) sub := NewSubscription(ctx, req, startHead)
@ -136,17 +135,14 @@ func TestSubscriptionCloseReload(t *testing.T) {
"Reload should have been delivered after short time, took %s", elapsed) "Reload should have been delivered after short time, took %s", elapsed)
} }
func testPublish(index uint64, b *EventBuffer, key string) { func publishTestEvent(index uint64, b *EventBuffer, key string) {
// Don't care about the event payload for now just the semantics of publising // Don't care about the event payload for now just the semantics of publishing
// something. This is not a valid stream in the end-to-end streaming protocol // something. This is not a valid stream in the end-to-end streaming protocol
// but enough to test subscription mechanics. // but enough to test subscription mechanics.
e := agentpb.Event{ e := Event{
Index: index, Index: index,
Topic: agentpb.Topic_ServiceHealth, Topic: Topic_ServiceHealth,
Key: key, Key: key,
Payload: &agentpb.Event_EndOfSnapshot{
EndOfSnapshot: true,
},
} }
b.Append([]agentpb.Event{e}) b.Append([]Event{e})
} }