diff --git a/agent/agent.go b/agent/agent.go index a74eae9ce..9edeb5344 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -17,15 +17,16 @@ import ( "sync" "time" - "github.com/hashicorp/consul/agent/dns" - "github.com/hashicorp/consul/agent/token" + "github.com/armon/go-metrics" "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" - + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" + "golang.org/x/net/http2" "google.golang.org/grpc" - "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/cache" @@ -33,10 +34,13 @@ import ( "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/systemd" + "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/xds" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" @@ -46,10 +50,6 @@ import ( "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-multierror" - "github.com/hashicorp/raft" - "github.com/hashicorp/serf/serf" - "golang.org/x/net/http2" ) const ( @@ -311,6 +311,10 @@ type Agent struct { // they can update their internal state. configReloaders []ConfigReloader + // TODO: pass directly to HTTPHandlers and DNSServer once those are passed + // into Agent, which will allow us to remove this field. + rpcClientHealth *health.Client + // enterpriseAgent embeds fields that we only access in consul-enterprise builds enterpriseAgent } @@ -355,6 +359,8 @@ func New(bd BaseDeps) (*Agent, error) { cache: bd.Cache, } + a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a} + a.serviceManager = NewServiceManager(&a) // TODO: do this somewhere else, maybe move to newBaseDeps diff --git a/agent/dns.go b/agent/dns.go index 686ce8b20..72e622324 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -13,6 +13,9 @@ import ( metrics "github.com/armon/go-metrics" radix "github.com/armon/go-radix" "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/hashicorp/go-hclog" + "github.com/miekg/dns" + cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/config" agentdns "github.com/hashicorp/consul/agent/dns" @@ -21,8 +24,6 @@ import ( "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" - "github.com/hashicorp/go-hclog" - "github.com/miekg/dns" ) const ( @@ -1159,49 +1160,18 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st ServiceTags: []string{lookup.Tag}, TagFilter: lookup.Tag != "", QueryOptions: structs.QueryOptions{ - Token: d.agent.tokens.UserToken(), - AllowStale: cfg.AllowStale, - MaxAge: cfg.CacheMaxAge, + Token: d.agent.tokens.UserToken(), + AllowStale: cfg.AllowStale, + MaxAge: cfg.CacheMaxAge, + UseCache: cfg.UseCache, + MaxStaleDuration: cfg.MaxStale, }, EnterpriseMeta: lookup.EnterpriseMeta, } - var out structs.IndexedCheckServiceNodes - - if cfg.UseCache { - raw, m, err := d.agent.cache.Get(context.TODO(), cachetype.HealthServicesName, &args) - if err != nil { - return out, err - } - reply, ok := raw.(*structs.IndexedCheckServiceNodes) - if !ok { - // This should never happen, but we want to protect against panics - return out, fmt.Errorf("internal error: response type not correct") - } - d.logger.Trace("cache results for service", - "cache_hit", m.Hit, - "service", lookup.Service, - ) - - out = *reply - } else { - if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { - return out, err - } - } - - if args.AllowStale && out.LastContact > staleCounterThreshold { - metrics.IncrCounter([]string{"dns", "stale_queries"}, 1) - } - - // redo the request the response was too stale - if args.AllowStale && out.LastContact > cfg.MaxStale { - args.AllowStale = false - d.logger.Warn("Query results too stale, re-requesting") - - if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { - return structs.IndexedCheckServiceNodes{}, err - } + out, _, err := d.agent.rpcClientHealth.ServiceNodes(context.TODO(), args) + if err != nil { + return out, err } // Filter out any service nodes due to health checks diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index c26c685fd..af935f1ba 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" - cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" ) @@ -220,35 +219,21 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - // Make the RPC request - var out structs.IndexedCheckServiceNodes - defer setMeta(resp, &out.QueryMeta) + // TODO: handle this for all endpoints in parseConsistency + args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && args.QueryOptions.UseCache - if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { - raw, m, err := s.agent.cache.Get(req.Context(), cachetype.HealthServicesName, &args) - if err != nil { - return nil, err - } - defer setCacheMeta(resp, &m) - reply, ok := raw.(*structs.IndexedCheckServiceNodes) - if !ok { - // This should never happen, but we want to protect against panics - return nil, fmt.Errorf("internal error: response type not correct") - } - out = *reply - } else { - RETRY_ONCE: - if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { - return nil, err - } - if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { - args.AllowStale = false - args.MaxStaleDuration = 0 - goto RETRY_ONCE - } + out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args) + if err != nil { + return nil, err } + + if args.QueryOptions.UseCache { + setCacheMeta(resp, &md) + } + setMeta(resp, &out.QueryMeta) out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() + // FIXME: argument parsing should be done before performing the rpc // Filter to only passing if specified filter, err := getBoolQueryParam(params, api.HealthPassing) if err != nil { @@ -257,6 +242,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } + // FIXME: remove filterNonPassing, replace with nodes.Filter, which is used by DNSServer if filter { out.Nodes = filterNonPassing(out.Nodes) } diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go new file mode 100644 index 000000000..4c8d5f4d8 --- /dev/null +++ b/agent/rpcclient/health/health.go @@ -0,0 +1,64 @@ +package health + +import ( + "context" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/structs" +) + +type Client struct { + NetRPC NetRPC + Cache CacheGetter +} + +type NetRPC interface { + RPC(method string, args interface{}, reply interface{}) error +} + +type CacheGetter interface { + Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) +} + +func (c *Client) ServiceNodes( + ctx context.Context, + req structs.ServiceSpecificRequest, +) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { + out, md, err := c.getServiceNodes(ctx, req) + if err != nil { + return out, md, err + } + + // TODO: DNSServer emitted a metric here, do we still need it? + if req.QueryOptions.AllowStale && req.QueryOptions.MaxStaleDuration > 0 && out.QueryMeta.LastContact > req.MaxStaleDuration { + req.AllowStale = false + err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out) + return out, cache.ResultMeta{}, err + } + + return out, md, err +} + +func (c *Client) getServiceNodes( + ctx context.Context, + req structs.ServiceSpecificRequest, +) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { + var out structs.IndexedCheckServiceNodes + + if !req.QueryOptions.UseCache { + err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out) + return out, cache.ResultMeta{}, err + } + + raw, md, err := c.Cache.Get(ctx, cachetype.HealthServicesName, &req) + if err != nil { + return out, md, err + } + + value, ok := raw.(*structs.IndexedCheckServiceNodes) + if !ok { + panic("wrong response type for cachetype.HealthServicesName") + } + return *value, md, nil +}