767 lines
23 KiB
Go
767 lines
23 KiB
Go
package consul
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-uuid"
|
|
)
|
|
|
|
var (
|
|
// ErrQueryNotFound is returned if the query lookup failed.
|
|
ErrQueryNotFound = errors.New("Query not found")
|
|
)
|
|
|
|
// PreparedQuery manages the prepared query endpoint.
|
|
type PreparedQuery struct {
|
|
srv *Server
|
|
}
|
|
|
|
// Apply is used to apply a modifying request to the data store. This should
|
|
// only be used for operations that modify the data. The ID of the session is
|
|
// returned in the reply.
|
|
func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) {
|
|
if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now())
|
|
|
|
// Validate the ID. We must create new IDs before applying to the Raft
|
|
// log since it's not deterministic.
|
|
if args.Op == structs.PreparedQueryCreate {
|
|
if args.Query.ID != "" {
|
|
return fmt.Errorf("ID must be empty when creating a new prepared query")
|
|
}
|
|
|
|
// We are relying on the fact that UUIDs are random and unlikely
|
|
// to collide since this isn't inside a write transaction.
|
|
state := p.srv.fsm.State()
|
|
for {
|
|
if args.Query.ID, err = uuid.GenerateUUID(); err != nil {
|
|
return fmt.Errorf("UUID generation for prepared query failed: %v", err)
|
|
}
|
|
_, query, err := state.PreparedQueryGet(nil, args.Query.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("Prepared query lookup failed: %v", err)
|
|
}
|
|
if query == nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
*reply = args.Query.ID
|
|
|
|
// Get the ACL token for the request for the checks below.
|
|
rule, err := p.srv.ResolveToken(args.Token)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If prefix ACLs apply to the incoming query, then do an ACL check. We
|
|
// need to make sure they have write access for whatever they are
|
|
// proposing.
|
|
if prefix, ok := args.Query.GetACLPrefix(); ok {
|
|
if rule != nil && !rule.PreparedQueryWrite(prefix) {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied due to ACLs", args.Query.ID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
// This is the second part of the check above. If they are referencing
|
|
// an existing query then make sure it exists and that they have write
|
|
// access to whatever they are changing, if prefix ACLs apply to it.
|
|
if args.Op != structs.PreparedQueryCreate {
|
|
state := p.srv.fsm.State()
|
|
_, query, err := state.PreparedQueryGet(nil, args.Query.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("Prepared Query lookup failed: %v", err)
|
|
}
|
|
if query == nil {
|
|
return fmt.Errorf("Cannot modify non-existent prepared query: '%s'", args.Query.ID)
|
|
}
|
|
|
|
if prefix, ok := query.GetACLPrefix(); ok {
|
|
if rule != nil && !rule.PreparedQueryWrite(prefix) {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied due to ACLs", args.Query.ID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
}
|
|
}
|
|
|
|
// Parse the query and prep it for the state store.
|
|
switch args.Op {
|
|
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
|
|
if err := parseQuery(args.Query, p.srv.config.ACLEnforceVersion8); err != nil {
|
|
return fmt.Errorf("Invalid prepared query: %v", err)
|
|
}
|
|
|
|
case structs.PreparedQueryDelete:
|
|
// Nothing else to verify here, just do the delete (we only look
|
|
// at the ID field for this op).
|
|
|
|
default:
|
|
return fmt.Errorf("Unknown prepared query operation: %s", args.Op)
|
|
}
|
|
|
|
// Commit the query to the state store.
|
|
resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
|
|
if err != nil {
|
|
p.srv.logger.Printf("[ERR] consul.prepared_query: Apply failed %v", err)
|
|
return err
|
|
}
|
|
if respErr, ok := resp.(error); ok {
|
|
return respErr
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseQuery makes sure the entries of a query are valid for a create or
|
|
// update operation. Some of the fields are not checked or are partially
|
|
// checked, as noted in the comments below. This also updates all the parsed
|
|
// fields of the query.
|
|
func parseQuery(query *structs.PreparedQuery, enforceVersion8 bool) error {
|
|
// We skip a few fields:
|
|
// - ID is checked outside this fn.
|
|
// - Name is optional with no restrictions, except for uniqueness which
|
|
// is checked for integrity during the transaction. We also make sure
|
|
// names do not overlap with IDs, which is also checked during the
|
|
// transaction. Otherwise, people could "steal" queries that they don't
|
|
// have proper ACL rights to change.
|
|
// - Template is checked during the transaction since that's where we
|
|
// compile it.
|
|
|
|
// Anonymous queries require a session or need to be part of a template.
|
|
if enforceVersion8 {
|
|
if query.Name == "" && query.Template.Type == "" && query.Session == "" {
|
|
return fmt.Errorf("Must be bound to a session")
|
|
}
|
|
}
|
|
|
|
// Token is checked when the query is executed, but we do make sure the
|
|
// user hasn't accidentally pasted-in the special redacted token name,
|
|
// which if we allowed in would be super hard to debug and understand.
|
|
if query.Token == redactedToken {
|
|
return fmt.Errorf("Bad Token '%s', it looks like a query definition with a redacted token was submitted", query.Token)
|
|
}
|
|
|
|
// Parse the service query sub-structure.
|
|
if err := parseService(&query.Service); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Parse the DNS options sub-structure.
|
|
if err := parseDNS(&query.DNS); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseService makes sure the entries of a query are valid for a create or
|
|
// update operation. Some of the fields are not checked or are partially
|
|
// checked, as noted in the comments below. This also updates all the parsed
|
|
// fields of the query.
|
|
func parseService(svc *structs.ServiceQuery) error {
|
|
// Service is required.
|
|
if svc.Service == "" {
|
|
return fmt.Errorf("Must provide a Service name to query")
|
|
}
|
|
|
|
// NearestN can be 0 which means "don't fail over by RTT".
|
|
if svc.Failover.NearestN < 0 {
|
|
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
|
|
}
|
|
|
|
// Make sure the metadata filters are valid
|
|
if err := structs.ValidateMetadata(svc.NodeMeta, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We skip a few fields:
|
|
// - There's no validation for Datacenters; we skip any unknown entries
|
|
// at execution time.
|
|
// - OnlyPassing is just a boolean so doesn't need further validation.
|
|
// - Tags is a free-form list of tags and doesn't need further validation.
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseDNS makes sure the entries of a query are valid for a create or
|
|
// update operation. This also updates all the parsed fields of the query.
|
|
func parseDNS(dns *structs.QueryDNSOptions) error {
|
|
if dns.TTL != "" {
|
|
ttl, err := time.ParseDuration(dns.TTL)
|
|
if err != nil {
|
|
return fmt.Errorf("Bad DNS TTL '%s': %v", dns.TTL, err)
|
|
}
|
|
|
|
if ttl < 0 {
|
|
return fmt.Errorf("DNS TTL '%d', must be >=0", ttl)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get returns a single prepared query by ID.
|
|
func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
|
|
reply *structs.IndexedPreparedQueries) error {
|
|
if done, err := p.srv.forward("PreparedQuery.Get", args, args, reply); done {
|
|
return err
|
|
}
|
|
|
|
return p.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
index, query, err := state.PreparedQueryGet(ws, args.QueryID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if query == nil {
|
|
return ErrQueryNotFound
|
|
}
|
|
|
|
// If no prefix ACL applies to this query, then they are
|
|
// always allowed to see it if they have the ID. We still
|
|
// have to filter the remaining object for tokens.
|
|
reply.Index = index
|
|
reply.Queries = structs.PreparedQueries{query}
|
|
if _, ok := query.GetACLPrefix(); !ok {
|
|
return p.srv.filterACL(args.Token, &reply.Queries[0])
|
|
}
|
|
|
|
// Otherwise, attempt to filter it the usual way.
|
|
if err := p.srv.filterACL(args.Token, reply); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Since this is a GET of a specific query, if ACLs have
|
|
// prevented us from returning something that exists,
|
|
// then alert the user with a permission denied error.
|
|
if len(reply.Queries) == 0 {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Request to get prepared query '%s' denied due to ACLs", args.QueryID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// List returns all the prepared queries.
|
|
func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
|
|
if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done {
|
|
return err
|
|
}
|
|
|
|
return p.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
index, queries, err := state.PreparedQueryList(ws)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Index, reply.Queries = index, queries
|
|
return p.srv.filterACL(args.Token, reply)
|
|
})
|
|
}
|
|
|
|
// Explain resolves a prepared query and returns the (possibly rendered template)
|
|
// to the caller. This is useful for letting operators figure out which query is
|
|
// picking up a given name. We can also add additional info about how the query
|
|
// will be executed here.
|
|
func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest,
|
|
reply *structs.PreparedQueryExplainResponse) error {
|
|
if done, err := p.srv.forward("PreparedQuery.Explain", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now())
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
if args.RequireConsistent {
|
|
if err := p.srv.consistentRead(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Try to locate the query.
|
|
state := p.srv.fsm.State()
|
|
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName, args.Agent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if query == nil {
|
|
return ErrQueryNotFound
|
|
}
|
|
|
|
// Place the query into a list so we can run the standard ACL filter on
|
|
// it.
|
|
queries := &structs.IndexedPreparedQueries{
|
|
Queries: structs.PreparedQueries{query},
|
|
}
|
|
if err := p.srv.filterACL(args.Token, queries); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If the query was filtered out, return an error.
|
|
if len(queries.Queries) == 0 {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Explain on prepared query '%s' denied due to ACLs", query.ID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
reply.Query = *(queries.Queries[0])
|
|
return nil
|
|
}
|
|
|
|
// Execute runs a prepared query and returns the results. This will perform the
|
|
// failover logic if no local results are available. This is typically called as
|
|
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
|
|
func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
|
reply *structs.PreparedQueryExecuteResponse) error {
|
|
if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now())
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
if args.RequireConsistent {
|
|
if err := p.srv.consistentRead(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Try to locate the query.
|
|
state := p.srv.fsm.State()
|
|
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName, args.Agent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if query == nil {
|
|
return ErrQueryNotFound
|
|
}
|
|
|
|
// Execute the query for the local DC.
|
|
if err := p.execute(query, reply, args.Connect); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If they supplied a token with the query, use that, otherwise use the
|
|
// token passed in with the request.
|
|
token := args.QueryOptions.Token
|
|
if query.Token != "" {
|
|
token = query.Token
|
|
}
|
|
if err := p.srv.filterACL(token, &reply.Nodes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO (slackpad) We could add a special case here that will avoid the
|
|
// fail over if we filtered everything due to ACLs. This seems like it
|
|
// might not be worth the code complexity and behavior differences,
|
|
// though, since this is essentially a misconfiguration.
|
|
|
|
// Shuffle the results in case coordinates are not available if they
|
|
// requested an RTT sort.
|
|
reply.Nodes.Shuffle()
|
|
|
|
// Build the query source. This can be provided by the client, or by
|
|
// the prepared query. Client-specified takes priority.
|
|
qs := args.Source
|
|
if qs.Datacenter == "" {
|
|
qs.Datacenter = args.Agent.Datacenter
|
|
}
|
|
if query.Service.Near != "" && qs.Node == "" {
|
|
qs.Node = query.Service.Near
|
|
}
|
|
|
|
// Respect the magic "_agent" flag.
|
|
if qs.Node == "_agent" {
|
|
qs.Node = args.Agent.Node
|
|
} else if qs.Node == "_ip" {
|
|
if args.Source.Ip != "" {
|
|
_, nodes, err := state.Nodes(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
if args.Source.Ip == node.Address {
|
|
qs.Node = node.Node
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
p.srv.logger.Printf("[WARN] Prepared Query using near=_ip requires " +
|
|
"the source IP to be set but none was provided. No distance " +
|
|
"sorting will be done.")
|
|
|
|
}
|
|
|
|
// Either a source IP was given but we couldnt find the associated node
|
|
// or no source ip was given. In both cases we should wipe the Node value
|
|
if qs.Node == "_ip" {
|
|
qs.Node = ""
|
|
}
|
|
}
|
|
|
|
// Perform the distance sort
|
|
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we applied a distance sort, make sure that the node queried for is in
|
|
// position 0, provided the results are from the same datacenter.
|
|
if qs.Node != "" && reply.Datacenter == qs.Datacenter {
|
|
for i, node := range reply.Nodes {
|
|
if node.Node.Node == qs.Node {
|
|
reply.Nodes[0], reply.Nodes[i] = reply.Nodes[i], reply.Nodes[0]
|
|
break
|
|
}
|
|
|
|
// Put a cap on the depth of the search. The local agent should
|
|
// never be further in than this if distance sorting was applied.
|
|
if i == 9 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply the limit if given.
|
|
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
|
reply.Nodes = reply.Nodes[:args.Limit]
|
|
}
|
|
|
|
// In the happy path where we found some healthy nodes we go with that
|
|
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
|
|
// by the query setup.
|
|
if len(reply.Nodes) == 0 {
|
|
wrapper := &queryServerWrapper{p.srv}
|
|
if err := queryFailover(wrapper, query, args, reply); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ExecuteRemote is used when a local node doesn't have any instances of a
|
|
// service available and needs to probe remote DCs. This sends the full query
|
|
// over since the remote side won't have it in its state store, and this doesn't
|
|
// do the failover logic since that's already being run on the originating DC.
|
|
// We don't want things to fan out further than one level.
|
|
func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest,
|
|
reply *structs.PreparedQueryExecuteResponse) error {
|
|
if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now())
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
if args.RequireConsistent {
|
|
if err := p.srv.consistentRead(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Run the query locally to see what we can find.
|
|
if err := p.execute(&args.Query, reply, args.Connect); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If they supplied a token with the query, use that, otherwise use the
|
|
// token passed in with the request.
|
|
token := args.QueryOptions.Token
|
|
if args.Query.Token != "" {
|
|
token = args.Query.Token
|
|
}
|
|
if err := p.srv.filterACL(token, &reply.Nodes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We don't bother trying to do an RTT sort here since we are by
|
|
// definition in another DC. We just shuffle to make sure that we
|
|
// balance the load across the results.
|
|
reply.Nodes.Shuffle()
|
|
|
|
// Apply the limit if given.
|
|
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
|
reply.Nodes = reply.Nodes[:args.Limit]
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// execute runs a prepared query in the local DC without any failover. We don't
|
|
// apply any sorting options or ACL checks at this level - it should be done up above.
|
|
func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
|
reply *structs.PreparedQueryExecuteResponse,
|
|
forceConnect bool) error {
|
|
state := p.srv.fsm.State()
|
|
|
|
// If we're requesting Connect-capable services, then switch the
|
|
// lookup to be the Connect function.
|
|
f := state.CheckServiceNodes
|
|
if query.Service.Connect || forceConnect {
|
|
f = state.CheckConnectServiceNodes
|
|
}
|
|
|
|
_, nodes, err := f(nil, query.Service.Service)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Filter out any unhealthy nodes.
|
|
nodes = nodes.FilterIgnore(query.Service.OnlyPassing,
|
|
query.Service.IgnoreCheckIDs)
|
|
|
|
// Apply the node metadata filters, if any.
|
|
if len(query.Service.NodeMeta) > 0 {
|
|
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
|
|
}
|
|
|
|
// Apply the service metadata filters, if any.
|
|
if len(query.Service.ServiceMeta) > 0 {
|
|
nodes = serviceMetaFilter(query.Service.ServiceMeta, nodes)
|
|
}
|
|
|
|
// Apply the tag filters, if any.
|
|
if len(query.Service.Tags) > 0 {
|
|
nodes = tagFilter(query.Service.Tags, nodes)
|
|
}
|
|
|
|
// Capture the nodes and pass the DNS information through to the reply.
|
|
reply.Service = query.Service.Service
|
|
reply.Nodes = nodes
|
|
reply.DNS = query.DNS
|
|
|
|
// Stamp the result for this datacenter.
|
|
reply.Datacenter = p.srv.config.Datacenter
|
|
|
|
return nil
|
|
}
|
|
|
|
// tagFilter returns a list of nodes who satisfy the given tags. Nodes must have
|
|
// ALL the given tags, and NONE of the forbidden tags (prefixed with !). Note
|
|
// for performance this modifies the original slice.
|
|
func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
|
// Build up lists of required and disallowed tags.
|
|
must, not := make([]string, 0), make([]string, 0)
|
|
for _, tag := range tags {
|
|
tag = strings.ToLower(tag)
|
|
if strings.HasPrefix(tag, "!") {
|
|
tag = tag[1:]
|
|
not = append(not, tag)
|
|
} else {
|
|
must = append(must, tag)
|
|
}
|
|
}
|
|
|
|
n := len(nodes)
|
|
for i := 0; i < n; i++ {
|
|
node := nodes[i]
|
|
|
|
// Index the tags so lookups this way are cheaper.
|
|
index := make(map[string]struct{})
|
|
if node.Service != nil {
|
|
for _, tag := range node.Service.Tags {
|
|
tag = strings.ToLower(tag)
|
|
index[tag] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Bail if any of the required tags are missing.
|
|
for _, tag := range must {
|
|
if _, ok := index[tag]; !ok {
|
|
goto DELETE
|
|
}
|
|
}
|
|
|
|
// Bail if any of the disallowed tags are present.
|
|
for _, tag := range not {
|
|
if _, ok := index[tag]; ok {
|
|
goto DELETE
|
|
}
|
|
}
|
|
|
|
// At this point, the service is ok to leave in the list.
|
|
continue
|
|
|
|
DELETE:
|
|
nodes[i], nodes[n-1] = nodes[n-1], structs.CheckServiceNode{}
|
|
n--
|
|
i--
|
|
}
|
|
return nodes[:n]
|
|
}
|
|
|
|
// nodeMetaFilter returns a list of the nodes who satisfy the given metadata filters. Nodes
|
|
// must have ALL the given tags.
|
|
func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
|
var filtered structs.CheckServiceNodes
|
|
for _, node := range nodes {
|
|
if structs.SatisfiesMetaFilters(node.Node.Meta, filters) {
|
|
filtered = append(filtered, node)
|
|
}
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
|
var filtered structs.CheckServiceNodes
|
|
for _, node := range nodes {
|
|
if structs.SatisfiesMetaFilters(node.Service.Meta, filters) {
|
|
filtered = append(filtered, node)
|
|
}
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
// queryServer is a wrapper that makes it easier to test the failover logic.
|
|
type queryServer interface {
|
|
GetLogger() *log.Logger
|
|
GetOtherDatacentersByDistance() ([]string, error)
|
|
ForwardDC(method, dc string, args interface{}, reply interface{}) error
|
|
}
|
|
|
|
// queryServerWrapper applies the queryServer interface to a Server.
|
|
type queryServerWrapper struct {
|
|
srv *Server
|
|
}
|
|
|
|
// GetLogger returns the server's logger.
|
|
func (q *queryServerWrapper) GetLogger() *log.Logger {
|
|
return q.srv.logger
|
|
}
|
|
|
|
// GetOtherDatacentersByDistance calls into the server's fn and filters out the
|
|
// server's own DC.
|
|
func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
|
|
// TODO (slackpad) - We should cache this result since it's expensive to
|
|
// compute.
|
|
dcs, err := q.srv.router.GetDatacentersByDistance()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result []string
|
|
for _, dc := range dcs {
|
|
if dc != q.srv.config.Datacenter {
|
|
result = append(result, dc)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// ForwardDC calls into the server's RPC forwarder.
|
|
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
|
|
return q.srv.forwardDC(method, dc, args, reply)
|
|
}
|
|
|
|
// queryFailover runs an algorithm to determine which DCs to try and then calls
|
|
// them to try to locate alternative services.
|
|
func queryFailover(q queryServer, query *structs.PreparedQuery,
|
|
args *structs.PreparedQueryExecuteRequest,
|
|
reply *structs.PreparedQueryExecuteResponse) error {
|
|
|
|
// Pull the list of other DCs. This is sorted by RTT in case the user
|
|
// has selected that.
|
|
nearest, err := q.GetOtherDatacentersByDistance()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// This will help us filter unknown DCs supplied by the user.
|
|
known := make(map[string]struct{})
|
|
for _, dc := range nearest {
|
|
known[dc] = struct{}{}
|
|
}
|
|
|
|
// Build a candidate list of DCs to try, starting with the nearest N
|
|
// from RTTs.
|
|
var dcs []string
|
|
index := make(map[string]struct{})
|
|
if query.Service.Failover.NearestN > 0 {
|
|
for i, dc := range nearest {
|
|
if !(i < query.Service.Failover.NearestN) {
|
|
break
|
|
}
|
|
|
|
dcs = append(dcs, dc)
|
|
index[dc] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Then add any DCs explicitly listed that weren't selected above.
|
|
for _, dc := range query.Service.Failover.Datacenters {
|
|
// This will prevent a log of other log spammage if we do not
|
|
// attempt to talk to datacenters we don't know about.
|
|
if _, ok := known[dc]; !ok {
|
|
q.GetLogger().Printf("[DEBUG] consul.prepared_query: Skipping unknown datacenter '%s' in prepared query", dc)
|
|
continue
|
|
}
|
|
|
|
// This will make sure we don't re-try something that fails
|
|
// from the NearestN list.
|
|
if _, ok := index[dc]; !ok {
|
|
dcs = append(dcs, dc)
|
|
}
|
|
}
|
|
|
|
// Now try the selected DCs in priority order.
|
|
failovers := 0
|
|
for _, dc := range dcs {
|
|
// This keeps track of how many iterations we actually run.
|
|
failovers++
|
|
|
|
// Be super paranoid and set the nodes slice to nil since it's
|
|
// the same slice we used before. We know there's nothing in
|
|
// there, but the underlying msgpack library has a policy of
|
|
// updating the slice when it's non-nil, and that feels dirty.
|
|
// Let's just set it to nil so there's no way to communicate
|
|
// through this slice across successive RPC calls.
|
|
reply.Nodes = nil
|
|
|
|
// Note that we pass along the limit since it can be applied
|
|
// remotely to save bandwidth. We also pass along the consistency
|
|
// mode information and token we were given, so that applies to
|
|
// the remote query as well.
|
|
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
|
Datacenter: dc,
|
|
Query: *query,
|
|
Limit: args.Limit,
|
|
QueryOptions: args.QueryOptions,
|
|
Connect: args.Connect,
|
|
}
|
|
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
|
|
q.GetLogger().Printf("[WARN] consul.prepared_query: Failed querying for service '%s' in datacenter '%s': %s", query.Service.Service, dc, err)
|
|
continue
|
|
}
|
|
|
|
// We can stop if we found some nodes.
|
|
if len(reply.Nodes) > 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Set this at the end because the response from the remote doesn't have
|
|
// this information.
|
|
reply.Failovers = failovers
|
|
|
|
return nil
|
|
}
|