fddac3af00
adds an event buffer to hold events from raft changes. update events to use event buffer fix append call provide way to prune buffer items after TTL event publisher tests basic publish test wire up max item ttl rename package to stream, cleanup exploratory work subscription filtering subscription plumbing allow subscribers to consume events, handle closing subscriptions back out old exploratory ctx work fix lint remove unused ctx bits add a few comments fix test stop publisher on abandon
307 lines
10 KiB
Go
307 lines
10 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// eventBuffer is a single-writer, multiple-reader, fixed length concurrent
|
|
// buffer of events that have been published. The buffer is
|
|
// the head and tail of an atomically updated single-linked list. Atomic
|
|
// accesses are usually to be suspected as premature optimization but this
|
|
// specific design has several important features that significantly simplify a
|
|
// lot of our PubSub machinery.
|
|
//
|
|
// eventBuffer is an adaptation of conuls agent/stream/event eventBuffer but
|
|
// has been updated to be a max length buffer to work for Nomad's usecase.
|
|
//
|
|
// The eventBuffer only tracks the most recent set of published events,
|
|
// up to the max configured size, older events are dropped from the buffer
|
|
// but will only be garbage collected once the slowest reader drops the item.
|
|
// Consumers are notified of new events by closing a channel on the previous head
|
|
// allowing efficient broadcast to many watchers without having to run multiple
|
|
// goroutines or deliver to O(N) separate channels.
|
|
//
|
|
// Because eventBuffer is a linked list with atomically updated pointers, readers don't
|
|
// have to take a lock and can consume at their own pace. Slow readers can eventually
|
|
// append
|
|
//
|
|
// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil
|
|
// Events array. This enables subscribers to start watching for the next update
|
|
// immediately.
|
|
//
|
|
// The zero value eventBuffer is _not_ usable, as it has not been
|
|
// initialized with an empty bufferItem so can not be used to wait for the first
|
|
// published event. Call newEventBuffer to construct a new buffer.
|
|
//
|
|
// Calls to Append or AppendBuffer that mutate the head must be externally
|
|
// synchronized. This allows systems that already serialize writes to append
|
|
// without lock overhead (e.g. a snapshot goroutine appending thousands of
|
|
// events).
|
|
type eventBuffer struct {
|
|
size *int64
|
|
|
|
head atomic.Value
|
|
tail atomic.Value
|
|
|
|
maxSize int64
|
|
maxItemTTL time.Duration
|
|
}
|
|
|
|
// newEventBuffer creates an eventBuffer ready for use.
|
|
func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer {
|
|
zero := int64(0)
|
|
b := &eventBuffer{
|
|
maxSize: size,
|
|
size: &zero,
|
|
maxItemTTL: maxItemTTL,
|
|
}
|
|
|
|
item := newBufferItem(0, nil)
|
|
|
|
b.head.Store(item)
|
|
b.tail.Store(item)
|
|
|
|
return b
|
|
}
|
|
|
|
// Append a set of events from one raft operation to the buffer and notify
|
|
// watchers. After calling append, the caller must not make any further
|
|
// mutations to the events as they may have been exposed to subscribers in other
|
|
// goroutines. Append only supports a single concurrent caller and must be
|
|
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
|
|
func (b *eventBuffer) Append(index uint64, events []Event) {
|
|
b.appendItem(newBufferItem(index, events))
|
|
}
|
|
|
|
func (b *eventBuffer) appendItem(item *bufferItem) {
|
|
// Store the next item to the old tail
|
|
oldTail := b.Tail()
|
|
oldTail.link.next.Store(item)
|
|
|
|
// Update the tail to the new item
|
|
b.tail.Store(item)
|
|
|
|
// Increment the buffer size
|
|
size := atomic.AddInt64(b.size, 1)
|
|
|
|
// Check if we need to advance the head to keep the list
|
|
// constrained to max size
|
|
if size > b.maxSize {
|
|
b.advanceHead()
|
|
}
|
|
|
|
// notify waiters next event is available
|
|
close(oldTail.link.ch)
|
|
|
|
}
|
|
|
|
// advanceHead drops the current Head buffer item and notifies readers
|
|
// that the item should be discarded by closing droppedCh.
|
|
// Slow readers will prevent the old head from being GC'd until they
|
|
// discard it.
|
|
func (b *eventBuffer) advanceHead() {
|
|
old := b.Head()
|
|
next := old.link.next.Load()
|
|
|
|
close(old.link.droppedCh)
|
|
b.head.Store(next)
|
|
atomic.AddInt64(b.size, -1)
|
|
|
|
}
|
|
|
|
// Head returns the current head of the buffer. It will always exist but it may
|
|
// be a "sentinel" empty item with a nil Events slice to allow consumers to
|
|
// watch for the next update. Consumers should always check for empty Events and
|
|
// treat them as no-ops. Will panic if eventBuffer was not initialized correctly
|
|
// with NewEventBuffer
|
|
func (b *eventBuffer) Head() *bufferItem {
|
|
return b.head.Load().(*bufferItem)
|
|
}
|
|
|
|
// Tail returns the current tail of the buffer. It will always exist but it may
|
|
// be a "sentinel" empty item with a Nil Events slice to allow consumers to
|
|
// watch for the next update. Consumers should always check for empty Events and
|
|
// treat them as no-ops. Will panic if eventBuffer was not initialized correctly
|
|
// with NewEventBuffer
|
|
func (b *eventBuffer) Tail() *bufferItem {
|
|
return b.tail.Load().(*bufferItem)
|
|
}
|
|
|
|
// StarStartAtClosest returns the closest bufferItem to a requested starting
|
|
// index as well as the offset between the requested index and returned one.
|
|
func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) {
|
|
item := b.Head()
|
|
if index < item.Index {
|
|
return item, int(item.Index) - int(index)
|
|
}
|
|
if item.Index == index {
|
|
return item, 0
|
|
}
|
|
|
|
for {
|
|
prev := item
|
|
item = item.NextNoBlock()
|
|
if item == nil {
|
|
return prev, int(index) - int(prev.Index)
|
|
}
|
|
if index < item.Index {
|
|
return item, int(item.Index) - int(index)
|
|
}
|
|
if index == item.Index {
|
|
return item, 0
|
|
}
|
|
}
|
|
}
|
|
|
|
// Len returns the current length of the buffer
|
|
func (b *eventBuffer) Len() int {
|
|
return int(atomic.LoadInt64(b.size))
|
|
}
|
|
|
|
// prune advances the head of the buffer until the head buffer item TTL
|
|
// is no longer expired. It should be externally synchronized as it mutates
|
|
// the buffer of items.
|
|
func (b *eventBuffer) prune() {
|
|
for {
|
|
head := b.Head()
|
|
if b.Len() == 0 {
|
|
return
|
|
}
|
|
|
|
if time.Since(head.createdAt) > b.maxItemTTL {
|
|
b.advanceHead()
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// bufferItem represents a set of events published by a single raft operation.
|
|
// The first item returned by a newly constructed buffer will have nil Events.
|
|
// It is a sentinel value which is used to wait on the next events via Next.
|
|
//
|
|
// To iterate to the next event, a Next method may be called which may block if
|
|
// there is no next element yet.
|
|
//
|
|
// Holding a pointer to the item keeps all the events published since in memory
|
|
// so it's important that subscribers don't hold pointers to buffer items after
|
|
// they have been delivered except where it's intentional to maintain a cache or
|
|
// trailing store of events for performance reasons.
|
|
//
|
|
// Subscribers must not mutate the bufferItem or the Events or Encoded payloads
|
|
// inside as these are shared between all readers.
|
|
type bufferItem struct {
|
|
// Events is the set of events published at one raft index. This may be nil as
|
|
// a sentinel value to allow watching for the first event in a buffer. Callers
|
|
// should check and skip nil Events at any point in the buffer. It will also
|
|
// be nil if the producer appends an Error event because they can't complete
|
|
// the request to populate the buffer. Err will be non-nil in this case.
|
|
Events []Event
|
|
|
|
Index uint64
|
|
|
|
// Err is non-nil if the producer can't complete their task and terminates the
|
|
// buffer. Subscribers should return the error to clients and cease attempting
|
|
// to read from the buffer.
|
|
Err error
|
|
|
|
// link holds the next pointer and channel. This extra bit of indirection
|
|
// allows us to splice buffers together at arbitrary points without including
|
|
// events in one buffer just for the side-effect of watching for the next set.
|
|
// The link may not be mutated once the event is appended to a buffer.
|
|
link *bufferLink
|
|
|
|
createdAt time.Time
|
|
}
|
|
|
|
type bufferLink struct {
|
|
// next is an atomically updated pointer to the next event in the buffer. It
|
|
// is written exactly once by the single published and will always be set if
|
|
// ch is closed.
|
|
next atomic.Value
|
|
|
|
// ch is closed when the next event is published. It should never be mutated
|
|
// (e.g. set to nil) as that is racey, but is closed once when the next event
|
|
// is published. the next pointer will have been set by the time this is
|
|
// closed.
|
|
ch chan struct{}
|
|
|
|
// droppedCh is closed when the event is dropped from the buffer due to
|
|
// sizing constraints.
|
|
droppedCh chan struct{}
|
|
}
|
|
|
|
// newBufferItem returns a blank buffer item with a link and chan ready to have
|
|
// the fields set and be appended to a buffer.
|
|
func newBufferItem(index uint64, events []Event) *bufferItem {
|
|
return &bufferItem{
|
|
link: &bufferLink{
|
|
ch: make(chan struct{}),
|
|
droppedCh: make(chan struct{}),
|
|
},
|
|
Events: events,
|
|
Index: index,
|
|
createdAt: time.Now(),
|
|
}
|
|
}
|
|
|
|
// Next return the next buffer item in the buffer. It may block until ctx is
|
|
// cancelled or until the next item is published.
|
|
func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) {
|
|
// See if there is already a next value, block if so. Note we don't rely on
|
|
// state change (chan nil) as that's not threadsafe but detecting close is.
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-forceClose:
|
|
return nil, fmt.Errorf("subscription closed")
|
|
case <-i.link.ch:
|
|
}
|
|
|
|
// Check if the reader is too slow and the event buffer as discarded the event
|
|
select {
|
|
case <-i.link.droppedCh:
|
|
return nil, fmt.Errorf("event dropped from buffer")
|
|
default:
|
|
}
|
|
|
|
// If channel closed, there must be a next item to read
|
|
nextRaw := i.link.next.Load()
|
|
if nextRaw == nil {
|
|
// shouldn't be possible
|
|
return nil, errors.New("invalid next item")
|
|
}
|
|
next := nextRaw.(*bufferItem)
|
|
if next.Err != nil {
|
|
return nil, next.Err
|
|
}
|
|
return next, nil
|
|
}
|
|
|
|
// NextNoBlock returns the next item in the buffer without blocking. If it
|
|
// reaches the most recent item it will return nil.
|
|
func (i *bufferItem) NextNoBlock() *bufferItem {
|
|
nextRaw := i.link.next.Load()
|
|
if nextRaw == nil {
|
|
return nil
|
|
}
|
|
return nextRaw.(*bufferItem)
|
|
}
|
|
|
|
// NextLink returns either the next item in the buffer if there is one, or
|
|
// an empty item (that will be ignored by subscribers) that has a pointer to
|
|
// the same link as this bufferItem (but none of the bufferItem content).
|
|
// When the link.ch is closed, subscriptions will be notified of the next item.
|
|
func (i *bufferItem) NextLink() *bufferItem {
|
|
next := i.NextNoBlock()
|
|
if next == nil {
|
|
// Return an empty item that can be followed to the next item published.
|
|
return &bufferItem{link: i.link}
|
|
}
|
|
return next
|
|
}
|