diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 1ef29488f..8c42b5e85 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -5,11 +5,14 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" ) type Client struct { - NetRPC NetRPC - Cache CacheGetter + NetRPC NetRPC + Cache CacheGetter + ViewStore MaterializedViewStore + MaterializerDeps MaterializerDeps // CacheName to use for service health. CacheName string // CacheNameIngress is the name of the cache type to use for ingress @@ -26,6 +29,11 @@ type CacheGetter interface { Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error } +type MaterializedViewStore interface { + Get(ctx context.Context, req submatview.Request) (submatview.Result, error) + Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error +} + func (c *Client) ServiceNodes( ctx context.Context, req structs.ServiceSpecificRequest, @@ -56,6 +64,20 @@ func (c *Client) getServiceNodes( return out, cache.ResultMeta{}, err } + if req.Source.Node == "" { + sr, err := newServiceRequest(req, c.MaterializerDeps) + if err != nil { + return out, cache.ResultMeta{}, err + } + + result, err := c.ViewStore.Get(ctx, sr) + if err != nil { + return out, cache.ResultMeta{}, err + } + // TODO: can we store non-pointer + return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err + } + cacheName := c.CacheName if req.Ingress { cacheName = c.CacheNameIngress @@ -86,3 +108,38 @@ func (c *Client) Notify( } return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch) } + +func newServiceRequest(req structs.ServiceSpecificRequest, deps MaterializerDeps) (serviceRequest, error) { + view, err := newHealthView(req) + if err != nil { + return serviceRequest{}, err + } + return serviceRequest{ + ServiceSpecificRequest: req, + view: view, + deps: deps, + }, nil +} + +type serviceRequest struct { + structs.ServiceSpecificRequest + view *healthView + deps MaterializerDeps +} + +func (r serviceRequest) CacheInfo() cache.RequestInfo { + return r.ServiceSpecificRequest.CacheInfo() +} + +func (r serviceRequest) Type() string { + return "service-health" +} + +func (r serviceRequest) NewMaterializer() *submatview.Materializer { + return submatview.NewMaterializer(submatview.Deps{ + View: r.view, + Client: r.deps.Client, + Logger: r.deps.Logger, + Request: newMaterializerRequest(r.ServiceSpecificRequest), + }) +} diff --git a/agent/cache-types/streaming_health_services.go b/agent/rpcclient/health/view.go similarity index 58% rename from agent/cache-types/streaming_health_services.go rename to agent/rpcclient/health/view.go index acaabc0a7..ef89a156e 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/rpcclient/health/view.go @@ -1,74 +1,27 @@ -package cachetype +package health import ( - "context" "fmt" "reflect" "sort" "strings" - "time" "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/submatview" - "github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" ) -const ( - // Recommended name for registration. - StreamingHealthServicesName = "streaming-health-services" -) - -// StreamingHealthServices supports fetching discovering service instances via the -// catalog using the streaming gRPC endpoint. -type StreamingHealthServices struct { - RegisterOptionsBlockingRefresh - deps MaterializerDeps -} - -// RegisterOptions returns options with a much shorter LastGetTTL than the default. -// Unlike other cache-types, StreamingHealthServices runs a materialized view in -// the background which will receive streamed events from a server. If the cache -// is not being used, that stream uses memory on the server and network transfer -// between the client and the server. -// The materialize view and the stream are stopped when the cache entry expires, -// so using a shorter TTL ensures the cache entry expires sooner. -func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions { - opts := c.RegisterOptionsBlockingRefresh.RegisterOptions() - opts.LastGetTTL = 20 * time.Minute - return opts -} - -// NewStreamingHealthServices creates a cache-type for watching for service -// health results via streaming updates. -func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices { - return &StreamingHealthServices{deps: deps} -} - type MaterializerDeps struct { Client submatview.StreamClient Logger hclog.Logger } -// Fetch service health from the materialized view. If no materialized view -// exists, create one and start it running in a goroutine. The goroutine will -// exit when the cache entry storing the result is expired, the cache will call -// Close on the result.State. -// -// Fetch implements part of the cache.Type interface, and assumes that the -// caller ensures that only a single call to Fetch is running at any time. -func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { - if opts.LastResult != nil && opts.LastResult.State != nil { - return opts.LastResult.State.(*streamingHealthState).Fetch(opts) - } - - srvReq := req.(*structs.ServiceSpecificRequest) - newReqFn := func(index uint64) pbsubscribe.SubscribeRequest { +func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) pbsubscribe.SubscribeRequest { + return func(index uint64) pbsubscribe.SubscribeRequest { req := pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: srvReq.ServiceName, @@ -82,69 +35,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque } return req } - - materializer, err := newMaterializer(c.deps, newReqFn, srvReq) - if err != nil { - return cache.FetchResult{}, err - } - ctx, cancel := context.WithCancel(context.TODO()) - go materializer.Run(ctx) - - state := &streamingHealthState{ - materializer: materializer, - done: ctx.Done(), - cancel: cancel, - } - return state.Fetch(opts) } -func newMaterializer( - deps MaterializerDeps, - newRequestFn func(uint64) pbsubscribe.SubscribeRequest, - req *structs.ServiceSpecificRequest, -) (*submatview.Materializer, error) { - view, err := newHealthView(req) - if err != nil { - return nil, err - } - return submatview.NewMaterializer(submatview.Deps{ - View: view, - Client: deps.Client, - Logger: deps.Logger, - Waiter: &retry.Waiter{ - MinFailures: 1, - // Start backing off with small increments (200-400ms) which will double - // each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000 - // after that). (retry.Wait applies Max limit after jitter right now). - Factor: 200 * time.Millisecond, - MinWait: 0, - MaxWait: 60 * time.Second, - Jitter: retry.NewJitter(100), - }, - Request: newRequestFn, - }), nil -} - -// streamingHealthState wraps a Materializer to manage its lifecycle, and to -// add itself to the FetchResult.State. -type streamingHealthState struct { - materializer *submatview.Materializer - done <-chan struct{} - cancel func() -} - -func (s *streamingHealthState) Close() error { - s.cancel() - return nil -} - -func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) { - result, err := s.materializer.getFromView(s.done, opts) - result.State = s - return result, err -} - -func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) { +func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) { fe, err := newFilterEvaluator(req) if err != nil { return nil, err @@ -197,7 +90,7 @@ type filterEvaluator interface { Evaluate(datum interface{}) (bool, error) } -func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) { +func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) { var evaluators []filterEvaluator typ := reflect.TypeOf(structs.CheckServiceNode{}) diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index aef699427..deb008130 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -589,7 +589,7 @@ func TestNewFilterEvaluator(t *testing.T) { } fn := func(t *testing.T, tc testCase) { - e, err := newFilterEvaluator(&tc.req) + e, err := newFilterEvaluator(tc.req) require.NoError(t, err) actual, err := e.Evaluate(tc.data) require.NoError(t, err) diff --git a/agent/setup.go b/agent/setup.go index 9a84e7d24..9a2a2db40 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -8,31 +8,27 @@ import ( "sync" "time" - "github.com/hashicorp/consul/agent/consul/fsm" - "github.com/armon/go-metrics/prometheus" - - "github.com/hashicorp/consul/agent/consul/usagemetrics" - "github.com/hashicorp/consul/agent/local" - "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" grpcresolver "google.golang.org/grpc/resolver" autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" - cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/grpc/resolver" + "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" + "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" - "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/tlsutil" ) @@ -46,6 +42,7 @@ type BaseDeps struct { MetricsHandler MetricsHandler AutoConfig *autoconf.AutoConfig // TODO: use an interface Cache *cache.Cache + ViewStore *submatview.Store } // MetricsHandler provides an http.Handler for displaying metrics. @@ -100,6 +97,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) cfg.Cache.Logger = d.Logger.Named("cache") // cache-types are not registered yet, but they won't be used until the components are started. d.Cache = cache.New(cfg.Cache) + d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore")) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) builder := resolver.NewServerResolverBuilder(resolver.Config{}) @@ -122,33 +120,9 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) return d, err } - if err := registerCacheTypes(d); err != nil { - return d, err - } - return d, nil } -// registerCacheTypes on bd.Cache. -// -// Note: most cache types are still registered in Agent.registerCache. This -// function is for registering newer cache-types which no longer have a dependency -// on Agent. -func registerCacheTypes(bd BaseDeps) error { - if bd.RuntimeConfig.UseStreamingBackend { - conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) - if err != nil { - return err - } - matDeps := cachetype.MaterializerDeps{ - Client: pbsubscribe.NewStateChangeSubscriptionClient(conn), - Logger: bd.Logger, - } - bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps)) - } - return nil -} - func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool { var rpcSrcAddr *net.TCPAddr if !ipaddr.IsAny(config.RPCBindAddr) {