rpcclient/health: integrate submatview.Store into rpcclient/health
This commit is contained in:
parent
26c44aacde
commit
aadb46b209
|
@ -5,11 +5,14 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
NetRPC NetRPC
|
||||
Cache CacheGetter
|
||||
ViewStore MaterializedViewStore
|
||||
MaterializerDeps MaterializerDeps
|
||||
// CacheName to use for service health.
|
||||
CacheName string
|
||||
// CacheNameIngress is the name of the cache type to use for ingress
|
||||
|
@ -26,6 +29,11 @@ type CacheGetter interface {
|
|||
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
|
||||
}
|
||||
|
||||
type MaterializedViewStore interface {
|
||||
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
|
||||
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
|
||||
}
|
||||
|
||||
func (c *Client) ServiceNodes(
|
||||
ctx context.Context,
|
||||
req structs.ServiceSpecificRequest,
|
||||
|
@ -56,6 +64,20 @@ func (c *Client) getServiceNodes(
|
|||
return out, cache.ResultMeta{}, err
|
||||
}
|
||||
|
||||
if req.Source.Node == "" {
|
||||
sr, err := newServiceRequest(req, c.MaterializerDeps)
|
||||
if err != nil {
|
||||
return out, cache.ResultMeta{}, err
|
||||
}
|
||||
|
||||
result, err := c.ViewStore.Get(ctx, sr)
|
||||
if err != nil {
|
||||
return out, cache.ResultMeta{}, err
|
||||
}
|
||||
// TODO: can we store non-pointer
|
||||
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
|
||||
}
|
||||
|
||||
cacheName := c.CacheName
|
||||
if req.Ingress {
|
||||
cacheName = c.CacheNameIngress
|
||||
|
@ -86,3 +108,38 @@ func (c *Client) Notify(
|
|||
}
|
||||
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
|
||||
}
|
||||
|
||||
func newServiceRequest(req structs.ServiceSpecificRequest, deps MaterializerDeps) (serviceRequest, error) {
|
||||
view, err := newHealthView(req)
|
||||
if err != nil {
|
||||
return serviceRequest{}, err
|
||||
}
|
||||
return serviceRequest{
|
||||
ServiceSpecificRequest: req,
|
||||
view: view,
|
||||
deps: deps,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type serviceRequest struct {
|
||||
structs.ServiceSpecificRequest
|
||||
view *healthView
|
||||
deps MaterializerDeps
|
||||
}
|
||||
|
||||
func (r serviceRequest) CacheInfo() cache.RequestInfo {
|
||||
return r.ServiceSpecificRequest.CacheInfo()
|
||||
}
|
||||
|
||||
func (r serviceRequest) Type() string {
|
||||
return "service-health"
|
||||
}
|
||||
|
||||
func (r serviceRequest) NewMaterializer() *submatview.Materializer {
|
||||
return submatview.NewMaterializer(submatview.Deps{
|
||||
View: r.view,
|
||||
Client: r.deps.Client,
|
||||
Logger: r.deps.Logger,
|
||||
Request: newMaterializerRequest(r.ServiceSpecificRequest),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,74 +1,27 @@
|
|||
package cachetype
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
)
|
||||
|
||||
const (
|
||||
// Recommended name for registration.
|
||||
StreamingHealthServicesName = "streaming-health-services"
|
||||
)
|
||||
|
||||
// StreamingHealthServices supports fetching discovering service instances via the
|
||||
// catalog using the streaming gRPC endpoint.
|
||||
type StreamingHealthServices struct {
|
||||
RegisterOptionsBlockingRefresh
|
||||
deps MaterializerDeps
|
||||
}
|
||||
|
||||
// RegisterOptions returns options with a much shorter LastGetTTL than the default.
|
||||
// Unlike other cache-types, StreamingHealthServices runs a materialized view in
|
||||
// the background which will receive streamed events from a server. If the cache
|
||||
// is not being used, that stream uses memory on the server and network transfer
|
||||
// between the client and the server.
|
||||
// The materialize view and the stream are stopped when the cache entry expires,
|
||||
// so using a shorter TTL ensures the cache entry expires sooner.
|
||||
func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions {
|
||||
opts := c.RegisterOptionsBlockingRefresh.RegisterOptions()
|
||||
opts.LastGetTTL = 20 * time.Minute
|
||||
return opts
|
||||
}
|
||||
|
||||
// NewStreamingHealthServices creates a cache-type for watching for service
|
||||
// health results via streaming updates.
|
||||
func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices {
|
||||
return &StreamingHealthServices{deps: deps}
|
||||
}
|
||||
|
||||
type MaterializerDeps struct {
|
||||
Client submatview.StreamClient
|
||||
Logger hclog.Logger
|
||||
}
|
||||
|
||||
// Fetch service health from the materialized view. If no materialized view
|
||||
// exists, create one and start it running in a goroutine. The goroutine will
|
||||
// exit when the cache entry storing the result is expired, the cache will call
|
||||
// Close on the result.State.
|
||||
//
|
||||
// Fetch implements part of the cache.Type interface, and assumes that the
|
||||
// caller ensures that only a single call to Fetch is running at any time.
|
||||
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||
if opts.LastResult != nil && opts.LastResult.State != nil {
|
||||
return opts.LastResult.State.(*streamingHealthState).Fetch(opts)
|
||||
}
|
||||
|
||||
srvReq := req.(*structs.ServiceSpecificRequest)
|
||||
newReqFn := func(index uint64) pbsubscribe.SubscribeRequest {
|
||||
func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) pbsubscribe.SubscribeRequest {
|
||||
return func(index uint64) pbsubscribe.SubscribeRequest {
|
||||
req := pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: srvReq.ServiceName,
|
||||
|
@ -82,69 +35,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
|
|||
}
|
||||
return req
|
||||
}
|
||||
|
||||
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
|
||||
if err != nil {
|
||||
return cache.FetchResult{}, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
go materializer.Run(ctx)
|
||||
|
||||
state := &streamingHealthState{
|
||||
materializer: materializer,
|
||||
done: ctx.Done(),
|
||||
cancel: cancel,
|
||||
}
|
||||
return state.Fetch(opts)
|
||||
}
|
||||
|
||||
func newMaterializer(
|
||||
deps MaterializerDeps,
|
||||
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
|
||||
req *structs.ServiceSpecificRequest,
|
||||
) (*submatview.Materializer, error) {
|
||||
view, err := newHealthView(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return submatview.NewMaterializer(submatview.Deps{
|
||||
View: view,
|
||||
Client: deps.Client,
|
||||
Logger: deps.Logger,
|
||||
Waiter: &retry.Waiter{
|
||||
MinFailures: 1,
|
||||
// Start backing off with small increments (200-400ms) which will double
|
||||
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
|
||||
// after that). (retry.Wait applies Max limit after jitter right now).
|
||||
Factor: 200 * time.Millisecond,
|
||||
MinWait: 0,
|
||||
MaxWait: 60 * time.Second,
|
||||
Jitter: retry.NewJitter(100),
|
||||
},
|
||||
Request: newRequestFn,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// streamingHealthState wraps a Materializer to manage its lifecycle, and to
|
||||
// add itself to the FetchResult.State.
|
||||
type streamingHealthState struct {
|
||||
materializer *submatview.Materializer
|
||||
done <-chan struct{}
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (s *streamingHealthState) Close() error {
|
||||
s.cancel()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
|
||||
result, err := s.materializer.getFromView(s.done, opts)
|
||||
result.State = s
|
||||
return result, err
|
||||
}
|
||||
|
||||
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
|
||||
func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) {
|
||||
fe, err := newFilterEvaluator(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -197,7 +90,7 @@ type filterEvaluator interface {
|
|||
Evaluate(datum interface{}) (bool, error)
|
||||
}
|
||||
|
||||
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
|
||||
func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) {
|
||||
var evaluators []filterEvaluator
|
||||
|
||||
typ := reflect.TypeOf(structs.CheckServiceNode{})
|
|
@ -589,7 +589,7 @@ func TestNewFilterEvaluator(t *testing.T) {
|
|||
}
|
||||
|
||||
fn := func(t *testing.T, tc testCase) {
|
||||
e, err := newFilterEvaluator(&tc.req)
|
||||
e, err := newFilterEvaluator(tc.req)
|
||||
require.NoError(t, err)
|
||||
actual, err := e.Evaluate(tc.data)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -8,31 +8,27 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
grpcresolver "google.golang.org/grpc/resolver"
|
||||
|
||||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
"github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/grpc/resolver"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
|
@ -46,6 +42,7 @@ type BaseDeps struct {
|
|||
MetricsHandler MetricsHandler
|
||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||
Cache *cache.Cache
|
||||
ViewStore *submatview.Store
|
||||
}
|
||||
|
||||
// MetricsHandler provides an http.Handler for displaying metrics.
|
||||
|
@ -100,6 +97,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
cfg.Cache.Logger = d.Logger.Named("cache")
|
||||
// cache-types are not registered yet, but they won't be used until the components are started.
|
||||
d.Cache = cache.New(cfg.Cache)
|
||||
d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore"))
|
||||
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
|
||||
|
||||
builder := resolver.NewServerResolverBuilder(resolver.Config{})
|
||||
|
@ -122,33 +120,9 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
return d, err
|
||||
}
|
||||
|
||||
if err := registerCacheTypes(d); err != nil {
|
||||
return d, err
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// registerCacheTypes on bd.Cache.
|
||||
//
|
||||
// Note: most cache types are still registered in Agent.registerCache. This
|
||||
// function is for registering newer cache-types which no longer have a dependency
|
||||
// on Agent.
|
||||
func registerCacheTypes(bd BaseDeps) error {
|
||||
if bd.RuntimeConfig.UseStreamingBackend {
|
||||
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
matDeps := cachetype.MaterializerDeps{
|
||||
Client: pbsubscribe.NewStateChangeSubscriptionClient(conn),
|
||||
Logger: bd.Logger,
|
||||
}
|
||||
bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
|
||||
var rpcSrcAddr *net.TCPAddr
|
||||
if !ipaddr.IsAny(config.RPCBindAddr) {
|
||||
|
|
Loading…
Reference in New Issue