api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled (#9967)

The streaming cache type for service health has no way to handle v1/health/ingress/:service queries as there is no equivalent topic that would return the appropriate data.

Ensure that attempts to use this endpoint will use the old cache-type for now so that they return appropriate data when streaming is enabled.
This commit is contained in:
R.B. Boyer 2021-04-05 13:23:00 -05:00 committed by GitHub
parent 4168e8dc02
commit af78561018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 23 deletions

3
.changelog/9967.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled
```

View File

@ -374,9 +374,10 @@ func New(bd BaseDeps) (*Agent, error) {
cacheName = cachetype.StreamingHealthServicesName cacheName = cachetype.StreamingHealthServicesName
} }
a.rpcClientHealth = &health.Client{ a.rpcClientHealth = &health.Client{
Cache: bd.Cache, Cache: bd.Cache,
NetRPC: &a, NetRPC: &a,
CacheName: cacheName, CacheName: cacheName,
CacheNameIngress: cachetype.HealthServicesName,
} }
a.serviceManager = NewServiceManager(&a) a.serviceManager = NewServiceManager(&a)

View File

@ -219,7 +219,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil return nil, nil
} }
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)
if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" { if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {

View File

@ -1418,13 +1418,20 @@ func TestHealthConnectServiceNodes(t *testing.T) {
} }
func TestHealthIngressServiceNodes(t *testing.T) { func TestHealthIngressServiceNodes(t *testing.T) {
t.Run("no streaming", func(t *testing.T) {
testHealthIngressServiceNodes(t, ` rpc { enable_streaming = false } use_streaming_backend = false `)
})
t.Run("cache with streaming", func(t *testing.T) {
testHealthIngressServiceNodes(t, ` rpc { enable_streaming = true } use_streaming_backend = true `)
})
}
func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
t.Parallel() a := NewTestAgent(t, agentHCL)
a := NewTestAgent(t, "")
defer a.Shutdown() defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1") testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -1461,34 +1468,64 @@ func TestHealthIngressServiceNodes(t *testing.T) {
require.Nil(t, a.RPC("ConfigEntry.Apply", req, &outB)) require.Nil(t, a.RPC("ConfigEntry.Apply", req, &outB))
require.True(t, outB) require.True(t, outB)
t.Run("associated service", func(t *testing.T) { checkResults := func(t *testing.T, obj interface{}) {
assert := assert.New(t)
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/ingress/%s", args.Service.Service), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
assert.Nil(err)
assertIndex(t, resp)
nodes := obj.(structs.CheckServiceNodes) nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 1) require.Len(t, nodes, 1)
require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind) require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind)
require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address) require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address)
require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy) require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy)
}) }
t.Run("non-associated service", func(t *testing.T) { require.True(t, t.Run("associated service", func(t *testing.T) {
assert := assert.New(t) req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/ingress/%s", args.Service.Service), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
require.NoError(t, err)
assertIndex(t, resp)
checkResults(t, obj)
}))
require.True(t, t.Run("non-associated service", func(t *testing.T) {
req, _ := http.NewRequest("GET", req, _ := http.NewRequest("GET",
"/v1/health/connect/notexist", nil) "/v1/health/connect/notexist", nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req) obj, err := a.srv.HealthIngressServiceNodes(resp, req)
assert.Nil(err) require.NoError(t, err)
assertIndex(t, resp) assertIndex(t, resp)
nodes := obj.(structs.CheckServiceNodes) nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 0) require.Len(t, nodes, 0)
}) }))
require.True(t, t.Run("test caching miss", func(t *testing.T) {
// List instances with cache enabled
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/ingress/%s?cached", args.Service.Service), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
require.NoError(t, err)
checkResults(t, obj)
// Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
}))
require.True(t, t.Run("test caching hit", func(t *testing.T) {
// List instances with cache enabled
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/ingress/%s?cached", args.Service.Service), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
require.NoError(t, err)
checkResults(t, obj)
// Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
}))
} }
func TestHealthConnectServiceNodes_Filter(t *testing.T) { func TestHealthConnectServiceNodes_Filter(t *testing.T) {

View File

@ -12,6 +12,9 @@ type Client struct {
Cache CacheGetter Cache CacheGetter
// CacheName to use for service health. // CacheName to use for service health.
CacheName string CacheName string
// CacheNameIngress is the name of the cache type to use for ingress
// service health.
CacheNameIngress string
} }
type NetRPC interface { type NetRPC interface {
@ -53,7 +56,12 @@ func (c *Client) getServiceNodes(
return out, cache.ResultMeta{}, err return out, cache.ResultMeta{}, err
} }
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
}
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
if err != nil { if err != nil {
return out, md, err return out, md, err
} }
@ -72,5 +80,9 @@ func (c *Client) Notify(
correlationID string, correlationID string,
ch chan<- cache.UpdateEvent, ch chan<- cache.UpdateEvent,
) error { ) error {
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
}
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
} }