Allow DNS interface to use agent cache (#5300)

Adds two new configuration parameters "dns_config.use_cache" and
"dns_config.cache_max_age" controlling how DNS requests use the agent
cache when querying servers.
This commit is contained in:
Aestek 2019-02-25 20:06:01 +01:00 committed by Matt Keeler
parent ae1cb27126
commit f8a28d13dd
10 changed files with 308 additions and 53 deletions

View File

@ -20,11 +20,11 @@ import (
"google.golang.org/grpc"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/cache-types"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
@ -42,8 +42,8 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-uuid"
multierror "github.com/hashicorp/go-multierror"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
@ -3468,6 +3468,15 @@ func (a *Agent) registerCache() {
// Prepared queries don't support blocking
Refresh: false,
})
a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
}
// defaultProxyCommand returns the default Connect managed proxy command.

View File

@ -0,0 +1,52 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const NodeServicesName = "node-services"
// NodeServices supports fetching discovering service instances via the
// catalog.
type NodeServices struct {
RPC RPC
}
func (c *NodeServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.NodeSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.IndexedNodeServices
if err := c.RPC.RPC("Catalog.NodeServices", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *NodeServices) SupportsBlocking() bool {
return true
}

View File

@ -0,0 +1,71 @@
package cachetype
import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestNodeServices(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &NodeServices{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedNodeServices
rpc.On("RPC", "Catalog.NodeServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.NodeSpecificRequest)
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("node-01", req.Node)
require.True(req.AllowStale)
reply := args.Get(2).(*structs.IndexedNodeServices)
reply.NodeServices = &structs.NodeServices{
Node: &structs.Node{
ID: "abcdef",
Node: "node-01",
Address: "127.0.0.5",
Datacenter: "dc1",
},
}
reply.QueryMeta.Index = 48
resp = reply
})
// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "node-01",
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)
}
func TestNodeServices_badReqType(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &NodeServices{RPC: rpc}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(err)
require.Contains(err.Error(), "wrong type")
}

View File

@ -715,6 +715,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
DNSSOA: soa,
DNSUDPAnswerLimit: b.intVal(c.DNS.UDPAnswerLimit),
DNSNodeMetaTXT: b.boolValWithDefault(c.DNS.NodeMetaTXT, true),
DNSUseCache: b.boolVal(c.DNS.UseCache),
DNSCacheMaxAge: b.durationVal("dns_config.cache_max_age", c.DNS.CacheMaxAge),
// HTTP
HTTPPort: httpPort,

View File

@ -555,6 +555,8 @@ type DNS struct {
UDPAnswerLimit *int `json:"udp_answer_limit,omitempty" hcl:"udp_answer_limit" mapstructure:"udp_answer_limit"`
NodeMetaTXT *bool `json:"enable_additional_node_meta_txt,omitempty" hcl:"enable_additional_node_meta_txt" mapstructure:"enable_additional_node_meta_txt"`
SOA *SOA `json:"soa,omitempty" hcl:"soa" mapstructure:"soa"`
UseCache *bool `json:"use_cache,omitempty" hcl:"use_cache" mapstructure:"use_cache"`
CacheMaxAge *string `json:"cache_max_age,omitempty" hcl:"cache_max_age" mapstructure:"cache_max_age"`
}
type HTTPConfig struct {

View File

@ -314,6 +314,16 @@ type RuntimeConfig struct {
// flag: -recursor string [-recursor string]
DNSRecursors []string
// DNSUseCache wether or not to use cache for dns queries
//
// hcl: dns_config { use_cache = (true|false) }
DNSUseCache bool
// DNSUseCache wether or not to use cache for dns queries
//
// hcl: dns_config { cache_max_age = "duration" }
DNSCacheMaxAge time.Duration
// HTTPBlockEndpoints is a list of endpoint prefixes to block in the
// HTTP API. Any requests to these will get a 403 response.
//

View File

@ -3066,7 +3066,9 @@ func TestFullConfig(t *testing.T) {
"service_ttl": {
"*": "32030s"
},
"udp_answer_limit": 29909
"udp_answer_limit": 29909,
"use_cache": true,
"cache_max_age": "5m"
},
"enable_acl_replication": true,
"enable_agent_tls_for_checks": true,
@ -3620,6 +3622,8 @@ func TestFullConfig(t *testing.T) {
"*" = "32030s"
}
udp_answer_limit = 29909
use_cache = true
cache_max_age = "5m"
}
enable_acl_replication = true
enable_agent_tls_for_checks = true
@ -4249,6 +4253,8 @@ func TestFullConfig(t *testing.T) {
DNSServiceTTL: map[string]time.Duration{"*": 32030 * time.Second},
DNSUDPAnswerLimit: 29909,
DNSNodeMetaTXT: true,
DNSUseCache: true,
DNSCacheMaxAge: 5 * time.Minute,
DataDir: dataDir,
Datacenter: "rzo029wg",
DevMode: true,
@ -5043,6 +5049,8 @@ func TestSanitize(t *testing.T) {
"Minttl": 0
},
"DNSUDPAnswerLimit": 0,
"DNSUseCache": false,
"DNSCacheMaxAge": "0s",
"DataDir": "",
"Datacenter": "",
"DevMode": false,

View File

@ -11,9 +11,10 @@ import (
"regexp"
"github.com/armon/go-metrics"
"github.com/armon/go-radix"
metrics "github.com/armon/go-metrics"
radix "github.com/armon/go-radix"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
@ -53,6 +54,8 @@ type dnsConfig struct {
Datacenter string
EnableTruncate bool
MaxStale time.Duration
UseCache bool
CacheMaxAge time.Duration
NodeName string
NodeTTL time.Duration
OnlyPassing bool
@ -140,6 +143,8 @@ func GetDNSConfig(conf *config.RuntimeConfig) *dnsConfig {
ServiceTTL: conf.DNSServiceTTL,
UDPAnswerLimit: conf.DNSUDPAnswerLimit,
NodeMetaTXT: conf.DNSNodeMetaTXT,
UseCache: conf.DNSUseCache,
CacheMaxAge: conf.DNSCacheMaxAge,
dnsSOAConfig: dnsSOAConfig{
Expire: conf.DNSSOA.Expire,
Minttl: conf.DNSSOA.Minttl,
@ -647,7 +652,7 @@ func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns.
}
// Make an RPC request
args := structs.NodeSpecificRequest{
args := &structs.NodeSpecificRequest{
Datacenter: datacenter,
Node: node,
QueryOptions: structs.QueryOptions{
@ -655,25 +660,13 @@ func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns.
AllowStale: d.config.AllowStale,
},
}
var out structs.IndexedNodeServices
RPC:
if err := d.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
out, err := d.lookupNode(args)
if err != nil {
d.logger.Printf("[ERR] dns: rpc error: %v", err)
resp.SetRcode(req, dns.RcodeServerFailure)
return
}
// Verify that request is not too stale, redo the request
if args.AllowStale {
if out.LastContact > d.config.MaxStale {
args.AllowStale = false
d.logger.Printf("[WARN] dns: Query results too stale, re-requesting")
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}
// If we have no address, return not found!
if out.NodeServices == nil {
d.addSOA(resp)
@ -705,6 +698,43 @@ RPC:
}
}
func (d *DNSServer) lookupNode(args *structs.NodeSpecificRequest) (*structs.IndexedNodeServices, error) {
var out structs.IndexedNodeServices
useCache := d.config.UseCache
RPC:
if useCache {
raw, _, err := d.agent.cache.Get(cachetype.NodeServicesName, args)
if err != nil {
return nil, err
}
reply, ok := raw.(*structs.IndexedNodeServices)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
out = *reply
} else {
if err := d.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
return nil, err
}
}
// Verify that request is not too stale, redo the request
if args.AllowStale {
if out.LastContact > d.config.MaxStale {
args.AllowStale = false
useCache = false
d.logger.Printf("[WARN] dns: Query results too stale, re-requesting")
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}
return &out, nil
}
// encodeKVasRFC1464 encodes a key-value pair according to RFC1464
func encodeKVasRFC1464(key, value string) (txt string) {
// For details on these replacements c.f. https://www.ietf.org/rfc/rfc1464.txt
@ -1030,12 +1060,29 @@ func (d *DNSServer) lookupServiceNodes(datacenter, service, tag string, connect
QueryOptions: structs.QueryOptions{
Token: d.agent.tokens.UserToken(),
AllowStale: d.config.AllowStale,
MaxAge: d.config.CacheMaxAge,
},
}
var out structs.IndexedCheckServiceNodes
if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return structs.IndexedCheckServiceNodes{}, err
if d.config.UseCache {
raw, m, err := d.agent.cache.Get(cachetype.HealthServicesName, &args)
if err != nil {
return out, err
}
reply, ok := raw.(*structs.IndexedCheckServiceNodes)
if !ok {
// This should never happen, but we want to protect against panics
return out, fmt.Errorf("internal error: response type not correct")
}
d.logger.Printf("[TRACE] dns: cache hit: %v for service %s", m.Hit, service)
out = *reply
} else {
if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return out, err
}
}
if args.AllowStale && out.LastContact > staleCounterThreshold {
@ -1122,6 +1169,7 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, remot
QueryOptions: structs.QueryOptions{
Token: d.agent.tokens.UserToken(),
AllowStale: d.config.AllowStale,
MaxAge: d.config.CacheMaxAge,
},
// Always pass the local agent through. In the DNS interface, there
@ -1150,6 +1198,20 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, remot
}
}
out, err := d.lookupPreparedQuery(args)
// If they give a bogus query name, treat that as a name error,
// not a full on server error. We have to use a string compare
// here since the RPC layer loses the type information.
if err != nil && err.Error() == consul.ErrQueryNotFound.Error() {
d.addSOA(resp)
resp.SetRcode(req, dns.RcodeNameError)
return
} else if err != nil {
resp.SetRcode(req, dns.RcodeServerFailure)
return
}
// TODO (slackpad) - What's a safe limit we can set here? It seems like
// with dup filtering done at this level we need to get everything to
// match the previous behavior. We can optimize by pushing more filtering
@ -1158,34 +1220,6 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, remot
// likely work in practice, like 10*maxUDPAnswerLimit which should help
// reduce bandwidth if there are thousands of nodes available.
var out structs.PreparedQueryExecuteResponse
RPC:
if err := d.agent.RPC("PreparedQuery.Execute", &args, &out); err != nil {
// If they give a bogus query name, treat that as a name error,
// not a full on server error. We have to use a string compare
// here since the RPC layer loses the type information.
if err.Error() == consul.ErrQueryNotFound.Error() {
d.addSOA(resp)
resp.SetRcode(req, dns.RcodeNameError)
return
}
d.logger.Printf("[ERR] dns: rpc error: %v", err)
resp.SetRcode(req, dns.RcodeServerFailure)
return
}
// Verify that request is not too stale, redo the request.
if args.AllowStale {
if out.LastContact > d.config.MaxStale {
args.AllowStale = false
d.logger.Printf("[WARN] dns: Query results too stale, re-requesting")
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}
// Determine the TTL. The parse should never fail since we vet it when
// the query is created, but we check anyway. If the query didn't
// specify a TTL then we will try to use the agent's service-specific
@ -1225,6 +1259,44 @@ RPC:
}
}
func (d *DNSServer) lookupPreparedQuery(args structs.PreparedQueryExecuteRequest) (*structs.PreparedQueryExecuteResponse, error) {
var out structs.PreparedQueryExecuteResponse
RPC:
if d.config.UseCache {
raw, m, err := d.agent.cache.Get(cachetype.PreparedQueryName, &args)
if err != nil {
return nil, err
}
reply, ok := raw.(*structs.PreparedQueryExecuteResponse)
if !ok {
// This should never happen, but we want to protect against panics
return nil, err
}
d.logger.Printf("[TRACE] dns: cache hit: %v for prepared query %s", m.Hit, args.QueryIDOrName)
out = *reply
} else {
if err := d.agent.RPC("PreparedQuery.Execute", &args, &out); err != nil {
return nil, err
}
}
// Verify that request is not too stale, redo the request.
if args.AllowStale {
if out.LastContact > d.config.MaxStale {
args.AllowStale = false
d.logger.Printf("[WARN] dns: Query results too stale, re-requesting")
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}
return &out, nil
}
// serviceNodeRecords is used to add the node records for a service lookup
func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration, maxRecursionLevel int) {
qName := req.Question[0].Name

View File

@ -16,7 +16,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/coordinate"
"github.com/mitchellh/hashstructure"
)
@ -431,6 +431,29 @@ func (r *NodeSpecificRequest) RequestDatacenter() string {
return r.Datacenter
}
func (r *NodeSpecificRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
Token: r.Token,
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
}
v, err := hashstructure.Hash([]interface{}{
r.Node,
}, nil)
if err == nil {
// If there is an error, we don't set the key. A blank key forces
// no cache for this request so the request is forwarded directly
// to the server.
info.Key = strconv.FormatUint(v, 10)
}
return info
}
// ChecksInStateRequest is used to query for nodes in a state
type ChecksInStateRequest struct {
Datacenter string

View File

@ -1072,6 +1072,12 @@ default will automatically work with some tooling.
Configures the Retry duration expressed in seconds, default value is
600, ie: 10 minutes.
* <a name="dns_use_cache"></a><a href="dns_use_cache">`use_cache`</a> - When set to true, DNS resolution will use the agent cache described
in [agent caching](/api/index.html#agent-caching). This setting affects all service and prepared queries DNS requests. Implies [`allow_stale`](#allow_stale)
* <a name="dns_cache_max_age"></a><a href="dns_cache_max_age">`dns_max_age`</a> - When [use_cache](#dns_use_cache) is enabled, the agent
will attempt to re-fetch the result from the servers if the cached value is older than this duration. See: [agent caching](/api/index.html#agent-caching).
* <a name="domain"></a><a href="#domain">`domain`</a> Equivalent to the
[`-domain` command-line flag](#_domain).