rpcclient/health: move all backend routing logic to client
This commit is contained in:
parent
318bbd3e30
commit
0ea49c3e65
|
@ -219,9 +219,6 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress && args.Source.Node == ""
|
|
||||||
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)
|
|
||||||
|
|
||||||
out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
|
out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -8,16 +8,14 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/submatview"
|
"github.com/hashicorp/consul/agent/submatview"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TODO: godoc
|
||||||
type Client struct {
|
type Client struct {
|
||||||
NetRPC NetRPC
|
NetRPC NetRPC
|
||||||
Cache CacheGetter
|
Cache CacheGetter
|
||||||
ViewStore MaterializedViewStore
|
ViewStore MaterializedViewStore
|
||||||
MaterializerDeps MaterializerDeps
|
MaterializerDeps MaterializerDeps
|
||||||
// CacheName to use for service health.
|
CacheName string
|
||||||
CacheName string
|
UseStreamingBackend bool
|
||||||
// CacheNameNotStreaming is the name of the cache type to use for any requests
|
|
||||||
// that are not supported by the streaming backend (ex: Ingress=true).
|
|
||||||
CacheNameNotStreaming string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type NetRPC interface {
|
type NetRPC interface {
|
||||||
|
@ -38,6 +36,15 @@ func (c *Client) ServiceNodes(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req structs.ServiceSpecificRequest,
|
req structs.ServiceSpecificRequest,
|
||||||
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
||||||
|
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
|
||||||
|
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
|
||||||
|
if err != nil {
|
||||||
|
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
|
||||||
|
}
|
||||||
|
// TODO: can we store non-pointer
|
||||||
|
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
|
||||||
|
}
|
||||||
|
|
||||||
out, md, err := c.getServiceNodes(ctx, req)
|
out, md, err := c.getServiceNodes(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return out, md, err
|
return out, md, err
|
||||||
|
@ -58,34 +65,12 @@ func (c *Client) getServiceNodes(
|
||||||
req structs.ServiceSpecificRequest,
|
req structs.ServiceSpecificRequest,
|
||||||
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
||||||
var out structs.IndexedCheckServiceNodes
|
var out structs.IndexedCheckServiceNodes
|
||||||
|
|
||||||
// TODO: if UseStreaming, elif !UseCache, else cache
|
|
||||||
|
|
||||||
if !req.QueryOptions.UseCache {
|
if !req.QueryOptions.UseCache {
|
||||||
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
||||||
return out, cache.ResultMeta{}, err
|
return out, cache.ResultMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Source.Node == "" {
|
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
|
||||||
sr := serviceRequest{
|
|
||||||
ServiceSpecificRequest: req,
|
|
||||||
deps: c.MaterializerDeps,
|
|
||||||
}
|
|
||||||
|
|
||||||
result, err := c.ViewStore.Get(ctx, sr)
|
|
||||||
if err != nil {
|
|
||||||
return out, cache.ResultMeta{}, err
|
|
||||||
}
|
|
||||||
// TODO: can we store non-pointer
|
|
||||||
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cacheName := c.CacheName
|
|
||||||
if req.Ingress || req.Source.Node != "" {
|
|
||||||
cacheName = c.CacheNameNotStreaming
|
|
||||||
}
|
|
||||||
|
|
||||||
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return out, md, err
|
return out, md, err
|
||||||
}
|
}
|
||||||
|
@ -104,11 +89,23 @@ func (c *Client) Notify(
|
||||||
correlationID string,
|
correlationID string,
|
||||||
ch chan<- cache.UpdateEvent,
|
ch chan<- cache.UpdateEvent,
|
||||||
) error {
|
) error {
|
||||||
cacheName := c.CacheName
|
if c.useStreaming(req) {
|
||||||
if req.Ingress || req.Source.Node != "" {
|
sr := c.newServiceRequest(req)
|
||||||
cacheName = c.CacheNameNotStreaming
|
return c.ViewStore.Notify(ctx, sr, correlationID, ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
|
||||||
|
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
|
||||||
|
return serviceRequest{
|
||||||
|
ServiceSpecificRequest: req,
|
||||||
|
deps: c.MaterializerDeps,
|
||||||
}
|
}
|
||||||
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type serviceRequest struct {
|
type serviceRequest struct {
|
||||||
|
|
|
@ -0,0 +1,235 @@
|
||||||
|
package health
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/submatview"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
req structs.ServiceSpecificRequest
|
||||||
|
expected func(t *testing.T, c *Client)
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
c := &Client{
|
||||||
|
NetRPC: &fakeNetRPC{},
|
||||||
|
Cache: &fakeCache{},
|
||||||
|
ViewStore: &fakeViewStore{},
|
||||||
|
CacheName: "cache-no-streaming",
|
||||||
|
UseStreamingBackend: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err := c.ServiceNodes(context.Background(), tc.req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
tc.expected(t, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
var testCases = []testCase{
|
||||||
|
{
|
||||||
|
name: "rpc by default",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
},
|
||||||
|
expected: useRPC,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "use streaming instead of cache",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
QueryOptions: structs.QueryOptions{UseCache: true},
|
||||||
|
},
|
||||||
|
expected: useStreaming,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "use streaming for MinQueryIndex",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
|
||||||
|
},
|
||||||
|
expected: useStreaming,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "use cache for ingress request",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
QueryOptions: structs.QueryOptions{UseCache: true},
|
||||||
|
Ingress: true,
|
||||||
|
},
|
||||||
|
expected: useCache,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "use cache for near request",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
QueryOptions: structs.QueryOptions{UseCache: true},
|
||||||
|
Source: structs.QuerySource{Node: "node1"},
|
||||||
|
},
|
||||||
|
expected: useCache,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func useRPC(t *testing.T, c *Client) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
rpc, ok := c.NetRPC.(*fakeNetRPC)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
|
||||||
|
|
||||||
|
cache, ok := c.Cache.(*fakeCache)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
|
||||||
|
|
||||||
|
store, ok := c.ViewStore.(*fakeViewStore)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
|
||||||
|
|
||||||
|
require.Len(t, cache.calls, 0)
|
||||||
|
require.Len(t, store.calls, 0)
|
||||||
|
require.Equal(t, []string{"Health.ServiceNodes"}, rpc.calls)
|
||||||
|
}
|
||||||
|
|
||||||
|
func useStreaming(t *testing.T, c *Client) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
rpc, ok := c.NetRPC.(*fakeNetRPC)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
|
||||||
|
|
||||||
|
cache, ok := c.Cache.(*fakeCache)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
|
||||||
|
|
||||||
|
store, ok := c.ViewStore.(*fakeViewStore)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
|
||||||
|
|
||||||
|
require.Len(t, cache.calls, 0)
|
||||||
|
require.Len(t, rpc.calls, 0)
|
||||||
|
require.Len(t, store.calls, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func useCache(t *testing.T, c *Client) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
rpc, ok := c.NetRPC.(*fakeNetRPC)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
|
||||||
|
|
||||||
|
cache, ok := c.Cache.(*fakeCache)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
|
||||||
|
|
||||||
|
store, ok := c.ViewStore.(*fakeViewStore)
|
||||||
|
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
|
||||||
|
|
||||||
|
require.Len(t, rpc.calls, 0)
|
||||||
|
require.Len(t, store.calls, 0)
|
||||||
|
require.Equal(t, []string{"cache-no-streaming"}, cache.calls)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeCache struct {
|
||||||
|
calls []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface{}, cache.ResultMeta, error) {
|
||||||
|
f.calls = append(f.calls, t)
|
||||||
|
result := &structs.IndexedCheckServiceNodes{}
|
||||||
|
return result, cache.ResultMeta{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeCache) Notify(_ context.Context, t string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error {
|
||||||
|
f.calls = append(f.calls, t)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeNetRPC struct {
|
||||||
|
calls []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeNetRPC) RPC(method string, _ interface{}, _ interface{}) error {
|
||||||
|
f.calls = append(f.calls, method)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeViewStore struct {
|
||||||
|
calls []submatview.Request
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatview.Result, error) {
|
||||||
|
f.calls = append(f.calls, req)
|
||||||
|
return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeViewStore) Notify(_ context.Context, req submatview.Request, _ string, _ chan<- cache.UpdateEvent) error {
|
||||||
|
f.calls = append(f.calls, req)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_Notify_BackendRouting(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
req structs.ServiceSpecificRequest
|
||||||
|
expected func(t *testing.T, c *Client)
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
c := &Client{
|
||||||
|
NetRPC: &fakeNetRPC{},
|
||||||
|
Cache: &fakeCache{},
|
||||||
|
ViewStore: &fakeViewStore{},
|
||||||
|
CacheName: "cache-no-streaming",
|
||||||
|
UseStreamingBackend: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.Notify(context.Background(), tc.req, "cid", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
tc.expected(t, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
var testCases = []testCase{
|
||||||
|
{
|
||||||
|
name: "streaming by default",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
},
|
||||||
|
expected: useStreaming,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "use cache for ingress request",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
Ingress: true,
|
||||||
|
},
|
||||||
|
expected: useCache,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "use cache for near request",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
Source: structs.QuerySource{Node: "node1"},
|
||||||
|
},
|
||||||
|
expected: useCache,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue