submatview: refactor Materializer

Refactor of Materializer.Run
Use handlers to manage state in Materializer
Rename Materializer receiver
rename m.l to m.lock, and flip some conditionals to remove the negative.
Improve godoc, rename Deps, move resetErr, and pass err into notifyUpdate
Update for NewSnapshotToFollow events
Refactor to move context cancel out of Materializer
This commit is contained in:
Daniel Nephin 2020-10-01 02:36:36 -04:00
parent e8c7881196
commit 58cf09247b
5 changed files with 266 additions and 302 deletions

View File

@ -17,11 +17,11 @@ func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.E
}
}
func newEndOfEmptySnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event {
func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Topic: topic,
Index: index,
Payload: &pbsubscribe.Event_EndOfEmptySnapshot{EndOfEmptySnapshot: true},
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}

View File

@ -37,46 +37,59 @@ func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices
}
type MaterializerDeps struct {
Client submatview.StreamingClient
Client submatview.StreamClient
Logger hclog.Logger
}
// Fetch implements cache.Type
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
if opts.LastResult != nil && opts.LastResult.State != nil {
return opts.LastResult.State.(*submatview.Materializer).Fetch(opts)
state := opts.LastResult.State.(*streamingHealthState)
return state.materializer.Fetch(state.done, opts)
}
srvReq := req.(*structs.ServiceSpecificRequest)
subReq := pbsubscribe.SubscribeRequest{
newReqFn := func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: srvReq.ServiceName,
Token: srvReq.Token,
Index: srvReq.MinQueryIndex,
Datacenter: srvReq.Datacenter,
Index: index,
}
if srvReq.Connect {
subReq.Topic = pbsubscribe.Topic_ServiceHealthConnect
req.Topic = pbsubscribe.Topic_ServiceHealthConnect
}
view, err := newMaterializer(c.deps, subReq, srvReq.Filter)
return req
}
m, err := newMaterializer(c.deps, newReqFn, srvReq.Filter)
if err != nil {
return cache.FetchResult{}, err
}
return view.Fetch(opts)
ctx, cancel := context.WithCancel(context.TODO())
go m.Run(ctx)
result, err := m.Fetch(ctx.Done(), opts)
result.State = &streamingHealthState{
materializer: m,
done: ctx.Done(),
cancel: cancel,
}
return result, err
}
func newMaterializer(
d MaterializerDeps,
r pbsubscribe.SubscribeRequest,
r func(uint64) pbsubscribe.SubscribeRequest,
filter string,
) (*submatview.Materializer, error) {
state, err := newHealthViewState(filter)
view, err := newHealthViewState(filter)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.TODO())
view := submatview.NewMaterializer(submatview.ViewDeps{
State: state,
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: d.Client,
Logger: d.Logger,
Waiter: &retry.Waiter{
@ -86,15 +99,22 @@ func newMaterializer(
Jitter: retry.NewJitter(100),
},
Request: r,
Stop: cancel,
Done: ctx.Done(),
})
go view.Run(ctx)
return view, nil
}), nil
}
type streamingHealthState struct {
materializer *submatview.Materializer
done <-chan struct{}
cancel func()
}
func (c *streamingHealthState) Close() error {
c.cancel()
return nil
}
func newHealthViewState(filterExpr string) (submatview.View, error) {
s := &healthViewState{state: make(map[string]structs.CheckServiceNode)}
s := &healthView{state: make(map[string]structs.CheckServiceNode)}
// We apply filtering to the raw CheckServiceNodes before we are done mutating
// state in Update to save from storing stuff in memory we'll only filter
@ -105,19 +125,19 @@ func newHealthViewState(filterExpr string) (submatview.View, error) {
return s, err
}
// healthViewState implements View for storing the view state
// healthView implements submatview.View for storing the view state
// of a service health result. We store it as a map to make updates and
// deletions a little easier but we could just store a result type
// (IndexedCheckServiceNodes) and update it in place for each event - that
// involves re-sorting each time etc. though.
type healthViewState struct {
type healthView struct {
state map[string]structs.CheckServiceNode
// TODO: test case with filter
filter *bexpr.Filter
}
// Update implements View
func (s *healthViewState) Update(events []*pbsubscribe.Event) error {
func (s *healthView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
serviceHealth := event.GetServiceHealth()
if serviceHealth == nil {
@ -147,7 +167,7 @@ func (s *healthViewState) Update(events []*pbsubscribe.Event) error {
}
// Result implements View
func (s *healthViewState) Result(index uint64) (interface{}, error) {
func (s *healthView) Result(index uint64) (interface{}, error) {
var result structs.IndexedCheckServiceNodes
// Avoid a nil slice if there are no results in the view
// TODO: why this ^
@ -159,6 +179,6 @@ func (s *healthViewState) Result(index uint64) (interface{}, error) {
return &result, nil
}
func (s *healthViewState) Reset() {
func (s *healthView) Reset() {
s.state = make(map[string]structs.CheckServiceNode)
}

View File

@ -28,10 +28,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
// EndOfSnapshot message immediately with index of 1.
client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
Timeout: time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
@ -111,7 +110,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
// After the error the view should re-subscribe with same index so will get
// a "resume stream".
client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex))
client.QueueEvents(newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex))
// Next fetch will continue to block until timeout and receive the same
// result.
@ -157,7 +156,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
// After the error the view should re-subscribe with same index so will get
// a "resume stream".
client.QueueEvents(newEndOfEmptySnapshotEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex))
client.QueueEvents(newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth, opts.MinIndex))
}()
// Next fetch should return the error

View File

@ -0,0 +1,51 @@
package submatview
import "github.com/hashicorp/consul/proto/pbsubscribe"
type eventHandler func(events *pbsubscribe.Event) (eventHandler, error)
func (m *Materializer) initialHandler(index uint64) eventHandler {
if index == 0 {
return newSnapshotHandler(m)
}
return m.resumeStreamHandler
}
type snapshotHandler struct {
material *Materializer
events []*pbsubscribe.Event
}
func newSnapshotHandler(m *Materializer) eventHandler {
return (&snapshotHandler{material: m}).handle
}
func (h *snapshotHandler) handle(event *pbsubscribe.Event) (eventHandler, error) {
if event.GetEndOfSnapshot() {
err := h.material.updateView(h.events, event.Index)
return h.material.eventStreamHandler, err
}
h.events = append(h.events, eventsFromEvent(event)...)
return h.handle, nil
}
func (m *Materializer) eventStreamHandler(event *pbsubscribe.Event) (eventHandler, error) {
err := m.updateView(eventsFromEvent(event), event.Index)
return m.eventStreamHandler, err
}
func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
if batch := event.GetEventBatch(); batch != nil {
return batch.Events
}
return []*pbsubscribe.Event{event}
}
func (m *Materializer) resumeStreamHandler(event *pbsubscribe.Event) (eventHandler, error) {
if event.GetNewSnapshotToFollow() {
m.reset()
return newSnapshotHandler(m), nil
}
return m.eventStreamHandler(event)
}

View File

@ -2,7 +2,6 @@ package submatview
import (
"context"
"errors"
"sync"
"time"
@ -16,8 +15,9 @@ import (
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// View is the interface used to manage they type-specific
// materialized view logic.
// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
@ -39,7 +39,129 @@ type View interface {
Reset()
}
type Filter func(seq interface{}) (interface{}, error)
// Materializer consumes the event stream, handling any framing events, and
// sends the events to View as they are received.
//
// Materializer is used as the cache.Result.State for a streaming
// cache type and manages the actual streaming RPC call to the servers behind
// the scenes until the cache result is discarded when TTL expires.
type Materializer struct {
deps Deps
retryWaiter *retry.Waiter
handler eventHandler
// lock protects the mutable state - all fields below it must only be accessed
// while holding lock.
lock sync.Mutex
index uint64
view View
updateCh chan struct{}
err error
}
type Deps struct {
View View
Client StreamClient
Logger hclog.Logger
Waiter *retry.Waiter
Request func(index uint64) pbsubscribe.SubscribeRequest
Stop func()
}
// StreamClient provides a subscription to state change events.
type StreamClient interface {
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
}
// NewMaterializer returns a new Materializer. Run must be called to start it.
func NewMaterializer(deps Deps) *Materializer {
v := &Materializer{
deps: deps,
view: deps.View,
retryWaiter: deps.Waiter,
}
v.reset()
return v
}
// Run receives events from the StreamClient and sends them to the View. It runs
// until ctx is cancelled, so it is expected to be run in a goroutine.
func (m *Materializer) Run(ctx context.Context) {
for {
req := m.deps.Request(m.index)
err := m.runSubscription(ctx, req)
if ctx.Err() != nil {
return
}
m.lock.Lock()
// TODO: move this into a func
// If this is a temporary error and it's the first consecutive failure,
// retry to see if we can get a result without erroring back to clients.
// If it's non-temporary or a repeated failure return to clients while we
// retry to get back in a good state.
if _, ok := err.(temporary); !ok || m.retryWaiter.Failures() > 0 {
m.notifyUpdateLocked(err)
}
waitCh := m.retryWaiter.Failed()
failures := m.retryWaiter.Failures()
m.lock.Unlock()
m.deps.Logger.Error("subscribe call failed",
"err", err,
"topic", req.Topic,
"key", req.Key,
"failure_count", failures)
select {
case <-ctx.Done():
return
case <-waitCh:
}
}
}
// temporary is a private interface as used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
type temporary interface {
Temporary() bool
}
// runSubscription opens a new subscribe streaming call to the servers and runs
// for it's lifetime or until the view is closed.
func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.SubscribeRequest) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m.handler = m.initialHandler(req.Index)
s, err := m.deps.Client.Subscribe(ctx, &req)
if err != nil {
return err
}
for {
event, err := s.Recv()
switch {
case isGrpcStatus(err, codes.Aborted):
m.reset()
return resetErr("stream reset requested")
case err != nil:
return err
}
m.handler, err = m.handler(event)
if err != nil {
m.reset()
return err
}
}
}
func isGrpcStatus(err error, code codes.Code) bool {
s, ok := status.FromError(err)
return ok && s.Code() == code
}
// resetErr represents a server request to reset the subscription, it's typed so
// we can mark it as temporary and so attempt to retry first time without
@ -56,279 +178,52 @@ func (e resetErr) Error() string {
return string(e)
}
// TODO: update godoc
// Materializer is a partial view of the state on servers, maintained via
// streaming subscriptions. It is specialized for different cache types by
// providing a View that encapsulates the logic to update the
// state and format it as the correct result type.
//
// The Materializer object becomes the cache.Result.State for a streaming
// cache type and manages the actual streaming RPC call to the servers behind
// the scenes until the cache result is discarded when TTL expires.
type Materializer struct {
// Properties above the lock are immutable after the view is constructed in
// NewMaterializer and must not be modified.
deps ViewDeps
// reset clears the state ready to start a new stream from scratch.
func (m *Materializer) reset() {
m.lock.Lock()
defer m.lock.Unlock()
// l protects the mutable state - all fields below it must only be accessed
// while holding l.
l sync.Mutex
index uint64
view View
snapshotDone bool
updateCh chan struct{}
retryWaiter *retry.Waiter
err error
m.view.Reset()
m.index = 0
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
}
// TODO: rename
type ViewDeps struct {
State View
Client StreamingClient
Logger hclog.Logger
Waiter *retry.Waiter
Request pbsubscribe.SubscribeRequest
Stop func()
Done <-chan struct{}
}
func (m *Materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
m.lock.Lock()
defer m.lock.Unlock()
// StreamingClient is the interface we need from the gRPC client stub. Separate
// interface simplifies testing.
type StreamingClient interface {
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
}
// NewMaterializer retrieves an existing view from the cache result
// state if one exists, otherwise creates a new one. Note that the returned view
// MUST have Close called eventually to avoid leaking resources. Typically this
// is done automatically if the view is returned in a cache.Result.State when
// the cache evicts the result. If the view is not returned in a result state
// though Close must be called some other way to avoid leaking the goroutine and
// memory.
func NewMaterializer(deps ViewDeps) *Materializer {
v := &Materializer{
deps: deps,
view: deps.State,
retryWaiter: deps.Waiter,
if err := m.view.Update(events); err != nil {
return err
}
v.reset()
return v
}
// Close implements io.Close and discards view state and stops background view
// maintenance.
func (v *Materializer) Close() error {
v.l.Lock()
defer v.l.Unlock()
v.deps.Stop()
m.index = index
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
return nil
}
func (v *Materializer) Run(ctx context.Context) {
if ctx.Err() != nil {
return
}
// Loop in case stream resets and we need to start over
for {
err := v.runSubscription(ctx)
if err != nil {
if ctx.Err() != nil {
// Err doesn't matter and is likely just context cancelled
return
}
v.l.Lock()
// If this is a temporary error and it's the first consecutive failure,
// retry to see if we can get a result without erroring back to clients.
// If it's non-temporary or a repeated failure return to clients while we
// retry to get back in a good state.
if _, ok := err.(temporary); !ok || v.retryWaiter.Failures() > 0 {
// Report error to blocked fetchers
v.err = err
v.notifyUpdateLocked()
}
waitCh := v.retryWaiter.Failed()
failures := v.retryWaiter.Failures()
v.l.Unlock()
v.deps.Logger.Error("subscribe call failed",
"err", err,
"topic", v.deps.Request.Topic,
"key", v.deps.Request.Key,
"failure_count", failures)
select {
case <-ctx.Done():
return
case <-waitCh:
}
}
// Loop and keep trying to resume subscription after error
}
}
// temporary is a private interface as used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
type temporary interface {
Temporary() bool
}
// runSubscription opens a new subscribe streaming call to the servers and runs
// for it's lifetime or until the view is closed.
func (v *Materializer) runSubscription(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Copy the request template
req := v.deps.Request
v.l.Lock()
// Update request index to be the current view index in case we are resuming a
// broken stream.
req.Index = v.index
// Make local copy so we don't have to read with a lock for every event. We
// are the only goroutine that can update so we know it won't change without
// us knowing but we do need lock to protect external readers when we update.
snapshotDone := v.snapshotDone
v.l.Unlock()
s, err := v.deps.Client.Subscribe(ctx, &req)
if err != nil {
return err
}
snapshotEvents := make([]*pbsubscribe.Event, 0)
for {
event, err := s.Recv()
switch {
case isGrpcStatus(err, codes.Aborted):
v.reset()
return resetErr("stream reset requested")
case err != nil:
return err
}
if event.GetEndOfSnapshot() {
// Hold lock while mutating view state so implementer doesn't need to
// worry about synchronization.
v.l.Lock()
// Deliver snapshot events to the View state
if err := v.view.Update(snapshotEvents); err != nil {
v.l.Unlock()
// This error is kinda fatal to the view - we didn't apply some events
// the server sent us which means our view is now not in sync. The only
// thing we can do is start over and hope for a better outcome.
v.reset()
return err
}
// Done collecting these now
snapshotEvents = nil
v.snapshotDone = true
// update our local copy so we can read it without lock.
snapshotDone = true
v.index = event.Index
// We have a good result, reset the error flag
v.err = nil
v.retryWaiter.Reset()
// Notify watchers of the update to the view
v.notifyUpdateLocked()
v.l.Unlock()
continue
}
if event.GetEndOfEmptySnapshot() {
// We've opened a new subscribe with a non-zero index to resume a
// connection and the server confirms it's not sending a new snapshot.
if !snapshotDone {
// We've somehow got into a bad state here - the server thinks we have
// an up-to-date snapshot but we don't think we do. Reset and start
// over.
v.reset()
return errors.New("stream resume sent but no local snapshot")
}
// Just continue on as we were!
continue
}
// We have an event for the topic
events := []*pbsubscribe.Event{event}
// If the event is a batch, unwrap and deliver the raw events
if batch := event.GetEventBatch(); batch != nil {
events = batch.Events
}
if snapshotDone {
// We've already got a snapshot, this is an update, deliver it right away.
v.l.Lock()
if err := v.view.Update(events); err != nil {
v.l.Unlock()
// This error is kinda fatal to the view - we didn't apply some events
// the server sent us which means our view is now not in sync. The only
// thing we can do is start over and hope for a better outcome.
v.reset()
return err
}
// Notify watchers of the update to the view
v.index = event.Index
// We have a good result, reset the error flag
v.err = nil
v.retryWaiter.Reset()
v.notifyUpdateLocked()
v.l.Unlock()
} else {
snapshotEvents = append(snapshotEvents, events...)
}
}
}
func isGrpcStatus(err error, code codes.Code) bool {
s, ok := status.FromError(err)
return ok && s.Code() == code
}
// reset clears the state ready to start a new stream from scratch.
func (v *Materializer) reset() {
v.l.Lock()
defer v.l.Unlock()
v.view.Reset()
v.notifyUpdateLocked()
// Always start from zero when we have a new state so we load a snapshot from
// the servers.
v.index = 0
v.snapshotDone = false
v.err = nil
v.retryWaiter.Reset()
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the s.l lock.
func (v *Materializer) notifyUpdateLocked() {
if v.updateCh != nil {
close(v.updateCh)
// one. It must be called while holding the s.lock lock.
func (m *Materializer) notifyUpdateLocked(err error) {
m.err = err
if m.updateCh != nil {
close(m.updateCh)
}
v.updateCh = make(chan struct{})
m.updateCh = make(chan struct{})
}
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
// call. Cache types that use streaming should just be able to proxy to this
// once they have a subscription object and return it's results directly.
func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
var result cache.FetchResult
// Get current view Result and index
v.l.Lock()
index := v.index
val, err := v.view.Result(v.index)
updateCh := v.updateCh
v.l.Unlock()
m.lock.Lock()
index := m.index
val, err := m.view.Result(m.index)
updateCh := m.updateCh
m.lock.Unlock()
if err != nil {
return result, err
@ -336,7 +231,6 @@ func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error)
result.Index = index
result.Value = val
result.State = v
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
@ -355,18 +249,18 @@ func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error)
select {
case <-updateCh:
// View updated, return the new result
v.l.Lock()
result.Index = v.index
m.lock.Lock()
result.Index = m.index
// Grab the new updateCh in case we need to keep waiting for the next
// update.
updateCh = v.updateCh
fetchErr := v.err
updateCh = m.updateCh
fetchErr := m.err
if fetchErr == nil {
// Only generate a new result if there was no error to avoid pointless
// work potentially shuffling the same data around.
result.Value, err = v.view.Result(v.index)
result.Value, err = m.view.Result(m.index)
}
v.l.Unlock()
m.lock.Unlock()
// If there was a non-transient error return it
if fetchErr != nil {
@ -391,7 +285,7 @@ func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error)
// Just return whatever we got originally, might still be empty
return result, nil
case <-v.deps.Done:
case <-done:
return result, context.Canceled
}
}