New Cache Types (#5995)

* Add a cache type for the Catalog.ListServices endpoint

* Add a cache type for the Catalog.ListDatacenters endpoint
This commit is contained in:
Matt Keeler 2019-06-24 14:11:34 -04:00 committed by GitHub
parent 93debd2610
commit f0f28707bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 370 additions and 16 deletions

View File

@ -3738,6 +3738,20 @@ func (a *Agent) registerCache() {
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{
RPC: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{
RPC: a,
}, &cache.RegisterOptions{
Refresh: false,
})
}
// defaultProxyCommand returns the default Connect managed proxy command.

View File

@ -0,0 +1,77 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const CatalogDatacentersName = "catalog-datacenters"
// Datacenters supports fetching discovering all the known datacenters
type CatalogDatacenters struct {
RPC RPC
}
func (c *CatalogDatacenters) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a CatalogDatacentersRequest.
reqReal, ok := req.(*structs.DatacentersRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply []string
if err := c.RPC.RPC("Catalog.ListDatacenters", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
// this is a purely synthetic index to keep the caching happy.
if opts.LastResult != nil {
equal := true
previousDCs, ok := opts.LastResult.Value.(*[]string)
if ok && previousDCs == nil {
ok = false
}
if ok {
if len(reply) != len(*previousDCs) {
equal = false
} else {
// ordering matters as they should be sorted based on distance
for i, dc := range reply {
if dc != (*previousDCs)[i] {
equal = false
break
}
}
}
}
result.Index = opts.LastResult.Index
if !equal || !ok {
result.Index += 1
}
} else {
result.Index = 1
}
return result, nil
}
func (c *CatalogDatacenters) SupportsBlocking() bool {
return false
}

View File

@ -0,0 +1,96 @@
package cachetype
import (
"testing"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestCatalogDatacenters(t *testing.T) {
rpc := TestRPC(t)
typ := &CatalogDatacenters{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *[]string
var resp2 *[]string
var resp3 *[]string
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)
reply := args.Get(2).(*[]string)
*reply = []string{
"primary", "secondary", "tertiary",
}
resp = reply
})
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)
reply := args.Get(2).(*[]string)
*reply = []string{
"primary", "tertiary", "secondary",
}
resp2 = reply
})
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)
reply := args.Get(2).(*[]string)
*reply = []string{
"primary", "secondary",
}
resp3 = reply
})
// Fetch first time
result, err := typ.Fetch(cache.FetchOptions{}, &structs.DatacentersRequest{})
result2, err := typ.Fetch(cache.FetchOptions{LastResult: &result}, &structs.DatacentersRequest{QueryOptions: structs.QueryOptions{MustRevalidate: true}})
result3, err := typ.Fetch(cache.FetchOptions{LastResult: &result2}, &structs.DatacentersRequest{QueryOptions: structs.QueryOptions{MustRevalidate: true}})
// make sure it was called the right number of times
rpc.AssertExpectations(t)
// make sure the first result was correct
require.NoError(t, err)
require.Equal(t, result, cache.FetchResult{
Value: resp,
Index: 1,
})
// validate the second result
require.NoError(t, err)
require.Equal(t, result2, cache.FetchResult{
Value: resp2,
Index: 2,
})
// validate the third result
require.NoError(t, err)
require.Equal(t, result3, cache.FetchResult{
Value: resp3,
Index: 3,
})
}
func TestDatacenters_badReqType(t *testing.T) {
rpc := TestRPC(t)
typ := &PreparedQuery{RPC: rpc}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(t, err)
require.Contains(t, err.Error(), "wrong type")
rpc.AssertExpectations(t)
}

View File

@ -0,0 +1,51 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const CatalogListServicesName = "catalog-list-services"
// CatalogListServices supports fetching discovering service names via the catalog.
type CatalogListServices struct {
RPC RPC
}
func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.DCSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.IndexedServices
if err := c.RPC.RPC("Catalog.ListServices", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *CatalogListServices) SupportsBlocking() bool {
return true
}

View File

@ -0,0 +1,62 @@
package cachetype
import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestCatalogListServices(t *testing.T) {
rpc := TestRPC(t)
typ := &CatalogListServices{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServices
rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedServices)
reply.Services = map[string][]string{
"foo": []string{"prod", "linux"},
"bar": []string{"qa", "windows"},
}
reply.QueryMeta.Index = 48
resp = reply
})
// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.DCSpecificRequest{
Datacenter: "dc1",
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)
rpc.AssertExpectations(t)
}
func TestCatalogListServices_badReqType(t *testing.T) {
rpc := TestRPC(t)
typ := &CatalogServices{RPC: rpc}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(t, err)
require.Contains(t, err.Error(), "wrong type")
rpc.AssertExpectations(t)
}

View File

@ -74,12 +74,33 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ
metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_datacenters"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
args := structs.DatacentersRequest{}
s.parseConsistency(resp, req, &args.QueryOptions)
parseCacheControl(resp, req, &args.QueryOptions)
var out []string
if err := s.agent.RPC("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
if args.QueryOptions.UseCache {
raw, m, err := s.agent.cache.Get(cachetype.CatalogDatacentersName, &args)
if err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
reply, ok := raw.(*[]string)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
defer setCacheMeta(resp, &m)
out = *reply
} else {
if err := s.agent.RPC("Catalog.ListDatacenters", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
}
metrics.IncrCounterWithLabels([]string{"client", "api", "success", "catalog_datacenters"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return out, nil
@ -133,10 +154,25 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var out structs.IndexedServices
defer setMeta(resp, &out.QueryMeta)
RETRY_ONCE:
if args.QueryOptions.UseCache {
raw, m, err := s.agent.cache.Get(cachetype.CatalogListServicesName, &args)
if err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
return nil, err
}
reply, ok := raw.(*structs.IndexedServices)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
defer setCacheMeta(resp, &m)
out = *reply
} else {
RETRY_ONCE:
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
@ -147,6 +183,8 @@ RETRY_ONCE:
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
}
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// Use empty map instead of nil

View File

@ -200,7 +200,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
}
// ListDatacenters is used to query for the list of known datacenters
func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]string) error {
dcs, err := c.srv.router.GetDatacentersByDistance()
if err != nil {
return err

View File

@ -321,6 +321,22 @@ type QuerySource struct {
Ip string
}
type DatacentersRequest struct {
QueryOptions
}
func (r *DatacentersRequest) CacheInfo() cache.RequestInfo {
return cache.RequestInfo{
Token: "",
Datacenter: "",
MinIndex: 0,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
Key: "catalog-datacenters", // must not be empty for cache to work
}
}
// DCSpecificRequest is used to query about a specific DC
type DCSpecificRequest struct {
Datacenter string