open-consul/agent/submatview/materializer.go

228 lines
6.1 KiB
Go
Raw Normal View History

package submatview
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib/retry"
Protobuf Refactoring for Multi-Module Cleanliness (#16302) Protobuf Refactoring for Multi-Module Cleanliness This commit includes the following: Moves all packages that were within proto/ to proto/private Rewrites imports to account for the packages being moved Adds in buf.work.yaml to enable buf workspaces Names the proto-public buf module so that we can override the Go package imports within proto/buf.yaml Bumps the buf version dependency to 1.14.0 (I was trying out the version to see if it would get around an issue - it didn't but it also doesn't break things and it seemed best to keep up with the toolchain changes) Why: In the future we will need to consume other protobuf dependencies such as the Google HTTP annotations for openapi generation or grpc-gateway usage. There were some recent changes to have our own ratelimiting annotations. The two combined were not working when I was trying to use them together (attempting to rebase another branch) Buf workspaces should be the solution to the problem Buf workspaces means that each module will have generated Go code that embeds proto file names relative to the proto dir and not the top level repo root. This resulted in proto file name conflicts in the Go global protobuf type registry. The solution to that was to add in a private/ directory into the path within the proto/ directory. That then required rewriting all the imports. Is this safe? AFAICT yes The gRPC wire protocol doesn't seem to care about the proto file names (although the Go grpc code does tack on the proto file name as Metadata in the ServiceDesc) Other than imports, there were no changes to any generated code as a result of this.
2023-02-17 21:14:46 +00:00
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
// Update is called when one or more events are received. The first call will
// include _all_ events in the initial snapshot which may be an empty set.
// Subsequent calls will contain one or more update events in the order they
// are received.
Update(events []*pbsubscribe.Event) error
// Result returns the type-specific cache result based on the state. When no
// events have been delivered yet the result should be an empty value type
// suitable to return to clients in case there is an empty result on the
// servers. The index the materialized view represents is maintained
// separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining
// indexes seen during Update.
Result(index uint64) interface{}
// Reset the view to the zero state, done in preparation for receiving a new
// snapshot.
Reset()
}
// Result returned from the View.
type Result struct {
Index uint64
Value interface{}
// Cached is true if the requested value was already available locally. If
// the value is false, it indicates that GetFromView had to wait for an update,
Cached bool
}
type Deps struct {
View View
Logger hclog.Logger
Waiter *retry.Waiter
Request func(index uint64) *pbsubscribe.SubscribeRequest
}
// materializer consumes the event stream, handling any framing events, and
// allows for querying the materialized view.
type materializer struct {
retryWaiter *retry.Waiter
logger hclog.Logger
// lock protects the mutable state - all fields below it must only be accessed
// while holding lock.
lock sync.Mutex
index uint64
view View
updateCh chan struct{}
err error
}
func newMaterializer(logger hclog.Logger, view View, waiter *retry.Waiter) *materializer {
m := materializer{
view: view,
retryWaiter: waiter,
logger: logger,
updateCh: make(chan struct{}),
}
if m.retryWaiter == nil {
m.retryWaiter = defaultWaiter()
}
return &m
}
// Query blocks until the index of the View is greater than opts.MinIndex,
// or the context is cancelled.
func (m *materializer) query(ctx context.Context, minIndex uint64) (Result, error) {
m.lock.Lock()
result := Result{
Index: m.index,
Value: m.view.Result(m.index),
}
updateCh := m.updateCh
m.lock.Unlock()
// If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan.
if result.Index > 0 && result.Index > minIndex {
result.Cached = true
return result, nil
}
for {
select {
case <-updateCh:
// View updated, return the new result
m.lock.Lock()
result.Index = m.index
switch {
case m.err != nil:
err := m.err
m.lock.Unlock()
return result, err
case result.Index <= minIndex:
// get a reference to the new updateCh, the previous one was closed
updateCh = m.updateCh
m.lock.Unlock()
continue
}
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, nil
case <-ctx.Done():
// Update the result value to the latest because callers may still
// use the value when the error is context.DeadlineExceeded
m.lock.Lock()
result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, ctx.Err()
}
}
}
func (m *materializer) currentIndex() uint64 {
var resp uint64
m.lock.Lock()
resp = m.index
m.lock.Unlock()
return resp
}
// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the m.lock lock.
func (m *materializer) notifyUpdateLocked(err error) {
m.err = err
close(m.updateCh)
m.updateCh = make(chan struct{})
}
// reset clears the state ready to start a new stream from scratch.
func (m *materializer) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.view.Reset()
m.index = 0
}
// updateView updates the view from a sequence of events and stores
// the corresponding Raft index.
func (m *materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.view.Update(events); err != nil {
return err
}
m.index = index
m.notifyUpdateLocked(nil)
m.retryWaiter.Reset()
return nil
}
func (m *materializer) handleError(req *pbsubscribe.SubscribeRequest, err error) {
failures := m.retryWaiter.Failures()
if isNonTemporaryOrConsecutiveFailure(err, failures) {
m.lock.Lock()
m.notifyUpdateLocked(err)
m.lock.Unlock()
}
logger := m.logger.With(
"err", err,
"topic", req.Topic,
"failure_count", failures+1,
)
if req.GetWildcardSubject() {
logger = logger.With("wildcard_subject", true)
} else if sub := req.GetNamedSubject(); sub != nil {
logger = logger.With("key", sub.Key)
} else {
logger = logger.With("key", req.Key) // nolint:staticcheck // SA1019 intentional use of deprecated field
}
logger.Error("subscribe call failed")
}
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
// temporary error or if failures > 0.
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
// temporary is an interface used by net and other std lib packages to
// show error types represent temporary/recoverable errors.
temp, ok := err.(interface {
Temporary() bool
})
return !ok || !temp.Temporary() || failures > 0
}
func defaultWaiter() *retry.Waiter {
return &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
}