129 lines
3.2 KiB
Go
129 lines
3.2 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package submatview
|
|
|
|
import (
|
|
"context"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
)
|
|
|
|
// RPCMaterializer is a materializer for a streaming cache type
|
|
// and manages the actual streaming RPC call to the servers behind
|
|
// the scenes until the cache result is discarded when its TTL expires.
|
|
type RPCMaterializer struct {
|
|
deps Deps
|
|
client StreamClient
|
|
handler eventHandler
|
|
|
|
mat *materializer
|
|
}
|
|
|
|
var _ Materializer = (*RPCMaterializer)(nil)
|
|
|
|
// StreamClient provides a subscription to state change events.
|
|
type StreamClient interface {
|
|
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
|
|
}
|
|
|
|
// NewRPCMaterializer returns a new Materializer. Run must be called to start it.
|
|
func NewRPCMaterializer(client StreamClient, deps Deps) *RPCMaterializer {
|
|
m := RPCMaterializer{
|
|
deps: deps,
|
|
client: client,
|
|
mat: newMaterializer(deps.Logger, deps.View, deps.Waiter),
|
|
}
|
|
return &m
|
|
}
|
|
|
|
// Query implements Materializer
|
|
func (m *RPCMaterializer) Query(ctx context.Context, minIndex uint64) (Result, error) {
|
|
return m.mat.query(ctx, minIndex)
|
|
}
|
|
|
|
// Run receives events from the StreamClient and sends them to the View. It runs
|
|
// until ctx is cancelled, so it is expected to be run in a goroutine.
|
|
// Mirrors implementation of LocalMaterializer
|
|
//
|
|
// Run implements Materializer
|
|
func (m *RPCMaterializer) Run(ctx context.Context) {
|
|
for {
|
|
req := m.deps.Request(m.mat.currentIndex())
|
|
err := m.subscribeOnce(ctx, req)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
m.mat.handleError(req, err)
|
|
|
|
if err := m.mat.retryWaiter.Wait(ctx); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// subscribeOnce opens a new subscribe streaming call to the servers and runs
|
|
// for its lifetime or until the view is closed.
|
|
func (m *RPCMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
m.handler = initialHandler(req.Index)
|
|
|
|
s, err := m.client.Subscribe(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
event, err := s.Recv()
|
|
switch {
|
|
case isGrpcStatus(err, codes.Aborted):
|
|
m.mat.reset()
|
|
return resetErr("stream reset requested")
|
|
case err != nil:
|
|
return err
|
|
}
|
|
|
|
m.handler, err = m.handler(m, event)
|
|
if err != nil {
|
|
m.mat.reset()
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func isGrpcStatus(err error, code codes.Code) bool {
|
|
s, ok := status.FromError(err)
|
|
return ok && s.Code() == code
|
|
}
|
|
|
|
// resetErr represents a server request to reset the subscription, it's typed so
|
|
// we can mark it as temporary and so attempt to retry first time without
|
|
// notifying clients.
|
|
type resetErr string
|
|
|
|
// Temporary Implements the internal Temporary interface
|
|
func (e resetErr) Temporary() bool {
|
|
return true
|
|
}
|
|
|
|
// Error implements error
|
|
func (e resetErr) Error() string {
|
|
return string(e)
|
|
}
|
|
|
|
// updateView implements viewState
|
|
func (m *RPCMaterializer) updateView(events []*pbsubscribe.Event, index uint64) error {
|
|
return m.mat.updateView(events, index)
|
|
}
|
|
|
|
// reset implements viewState
|
|
func (m *RPCMaterializer) reset() {
|
|
m.mat.reset()
|
|
}
|