99e0a124cb
This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
752 lines
23 KiB
Go
752 lines
23 KiB
Go
package consul
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-uuid"
|
|
)
|
|
|
|
var (
|
|
// ErrQueryNotFound is returned if the query lookup failed.
|
|
ErrQueryNotFound = errors.New("Query not found")
|
|
)
|
|
|
|
// PreparedQuery manages the prepared query endpoint.
|
|
type PreparedQuery struct {
|
|
srv *Server
|
|
}
|
|
|
|
// Apply is used to apply a modifying request to the data store. This should
|
|
// only be used for operations that modify the data. The ID of the session is
|
|
// returned in the reply.
|
|
func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) {
|
|
if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now())
|
|
|
|
// Validate the ID. We must create new IDs before applying to the Raft
|
|
// log since it's not deterministic.
|
|
if args.Op == structs.PreparedQueryCreate {
|
|
if args.Query.ID != "" {
|
|
return fmt.Errorf("ID must be empty when creating a new prepared query")
|
|
}
|
|
|
|
// We are relying on the fact that UUIDs are random and unlikely
|
|
// to collide since this isn't inside a write transaction.
|
|
state := p.srv.fsm.State()
|
|
for {
|
|
if args.Query.ID, err = uuid.GenerateUUID(); err != nil {
|
|
return fmt.Errorf("UUID generation for prepared query failed: %v", err)
|
|
}
|
|
_, query, err := state.PreparedQueryGet(nil, args.Query.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("Prepared query lookup failed: %v", err)
|
|
}
|
|
if query == nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
*reply = args.Query.ID
|
|
|
|
// Get the ACL token for the request for the checks below.
|
|
rule, err := p.srv.ResolveToken(args.Token)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If prefix ACLs apply to the incoming query, then do an ACL check. We
|
|
// need to make sure they have write access for whatever they are
|
|
// proposing.
|
|
if prefix, ok := args.Query.GetACLPrefix(); ok {
|
|
if rule != nil && !rule.PreparedQueryWrite(prefix) {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied due to ACLs", args.Query.ID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
}
|
|
|
|
// This is the second part of the check above. If they are referencing
|
|
// an existing query then make sure it exists and that they have write
|
|
// access to whatever they are changing, if prefix ACLs apply to it.
|
|
if args.Op != structs.PreparedQueryCreate {
|
|
state := p.srv.fsm.State()
|
|
_, query, err := state.PreparedQueryGet(nil, args.Query.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("Prepared Query lookup failed: %v", err)
|
|
}
|
|
if query == nil {
|
|
return fmt.Errorf("Cannot modify non-existent prepared query: '%s'", args.Query.ID)
|
|
}
|
|
|
|
if prefix, ok := query.GetACLPrefix(); ok {
|
|
if rule != nil && !rule.PreparedQueryWrite(prefix) {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied due to ACLs", args.Query.ID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
}
|
|
}
|
|
|
|
// Parse the query and prep it for the state store.
|
|
switch args.Op {
|
|
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
|
|
if err := parseQuery(args.Query, p.srv.config.ACLEnforceVersion8); err != nil {
|
|
return fmt.Errorf("Invalid prepared query: %v", err)
|
|
}
|
|
|
|
case structs.PreparedQueryDelete:
|
|
// Nothing else to verify here, just do the delete (we only look
|
|
// at the ID field for this op).
|
|
|
|
default:
|
|
return fmt.Errorf("Unknown prepared query operation: %s", args.Op)
|
|
}
|
|
|
|
// Commit the query to the state store.
|
|
resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
|
|
if err != nil {
|
|
p.srv.logger.Printf("[ERR] consul.prepared_query: Apply failed %v", err)
|
|
return err
|
|
}
|
|
if respErr, ok := resp.(error); ok {
|
|
return respErr
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseQuery makes sure the entries of a query are valid for a create or
|
|
// update operation. Some of the fields are not checked or are partially
|
|
// checked, as noted in the comments below. This also updates all the parsed
|
|
// fields of the query.
|
|
func parseQuery(query *structs.PreparedQuery, enforceVersion8 bool) error {
|
|
// We skip a few fields:
|
|
// - ID is checked outside this fn.
|
|
// - Name is optional with no restrictions, except for uniqueness which
|
|
// is checked for integrity during the transaction. We also make sure
|
|
// names do not overlap with IDs, which is also checked during the
|
|
// transaction. Otherwise, people could "steal" queries that they don't
|
|
// have proper ACL rights to change.
|
|
// - Template is checked during the transaction since that's where we
|
|
// compile it.
|
|
|
|
// Anonymous queries require a session or need to be part of a template.
|
|
if enforceVersion8 {
|
|
if query.Name == "" && query.Template.Type == "" && query.Session == "" {
|
|
return fmt.Errorf("Must be bound to a session")
|
|
}
|
|
}
|
|
|
|
// Token is checked when the query is executed, but we do make sure the
|
|
// user hasn't accidentally pasted-in the special redacted token name,
|
|
// which if we allowed in would be super hard to debug and understand.
|
|
if query.Token == redactedToken {
|
|
return fmt.Errorf("Bad Token '%s', it looks like a query definition with a redacted token was submitted", query.Token)
|
|
}
|
|
|
|
// Parse the service query sub-structure.
|
|
if err := parseService(&query.Service); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Parse the DNS options sub-structure.
|
|
if err := parseDNS(&query.DNS); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseService makes sure the entries of a query are valid for a create or
|
|
// update operation. Some of the fields are not checked or are partially
|
|
// checked, as noted in the comments below. This also updates all the parsed
|
|
// fields of the query.
|
|
func parseService(svc *structs.ServiceQuery) error {
|
|
// Service is required.
|
|
if svc.Service == "" {
|
|
return fmt.Errorf("Must provide a Service name to query")
|
|
}
|
|
|
|
// NearestN can be 0 which means "don't fail over by RTT".
|
|
if svc.Failover.NearestN < 0 {
|
|
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
|
|
}
|
|
|
|
// Make sure the metadata filters are valid
|
|
if err := structs.ValidateMetadata(svc.NodeMeta, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We skip a few fields:
|
|
// - There's no validation for Datacenters; we skip any unknown entries
|
|
// at execution time.
|
|
// - OnlyPassing is just a boolean so doesn't need further validation.
|
|
// - Tags is a free-form list of tags and doesn't need further validation.
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseDNS makes sure the entries of a query are valid for a create or
|
|
// update operation. This also updates all the parsed fields of the query.
|
|
func parseDNS(dns *structs.QueryDNSOptions) error {
|
|
if dns.TTL != "" {
|
|
ttl, err := time.ParseDuration(dns.TTL)
|
|
if err != nil {
|
|
return fmt.Errorf("Bad DNS TTL '%s': %v", dns.TTL, err)
|
|
}
|
|
|
|
if ttl < 0 {
|
|
return fmt.Errorf("DNS TTL '%d', must be >=0", ttl)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get returns a single prepared query by ID.
|
|
func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
|
|
reply *structs.IndexedPreparedQueries) error {
|
|
if done, err := p.srv.forward("PreparedQuery.Get", args, args, reply); done {
|
|
return err
|
|
}
|
|
|
|
return p.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
index, query, err := state.PreparedQueryGet(ws, args.QueryID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if query == nil {
|
|
return ErrQueryNotFound
|
|
}
|
|
|
|
// If no prefix ACL applies to this query, then they are
|
|
// always allowed to see it if they have the ID. We still
|
|
// have to filter the remaining object for tokens.
|
|
reply.Index = index
|
|
reply.Queries = structs.PreparedQueries{query}
|
|
if _, ok := query.GetACLPrefix(); !ok {
|
|
return p.srv.filterACL(args.Token, &reply.Queries[0])
|
|
}
|
|
|
|
// Otherwise, attempt to filter it the usual way.
|
|
if err := p.srv.filterACL(args.Token, reply); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Since this is a GET of a specific query, if ACLs have
|
|
// prevented us from returning something that exists,
|
|
// then alert the user with a permission denied error.
|
|
if len(reply.Queries) == 0 {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Request to get prepared query '%s' denied due to ACLs", args.QueryID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// List returns all the prepared queries.
|
|
func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
|
|
if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done {
|
|
return err
|
|
}
|
|
|
|
return p.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
index, queries, err := state.PreparedQueryList(ws)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Index, reply.Queries = index, queries
|
|
return p.srv.filterACL(args.Token, reply)
|
|
})
|
|
}
|
|
|
|
// Explain resolves a prepared query and returns the (possibly rendered template)
|
|
// to the caller. This is useful for letting operators figure out which query is
|
|
// picking up a given name. We can also add additional info about how the query
|
|
// will be executed here.
|
|
func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest,
|
|
reply *structs.PreparedQueryExplainResponse) error {
|
|
if done, err := p.srv.forward("PreparedQuery.Explain", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now())
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
if args.RequireConsistent {
|
|
if err := p.srv.consistentRead(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Try to locate the query.
|
|
state := p.srv.fsm.State()
|
|
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName, args.Agent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if query == nil {
|
|
return ErrQueryNotFound
|
|
}
|
|
|
|
// Place the query into a list so we can run the standard ACL filter on
|
|
// it.
|
|
queries := &structs.IndexedPreparedQueries{
|
|
Queries: structs.PreparedQueries{query},
|
|
}
|
|
if err := p.srv.filterACL(args.Token, queries); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If the query was filtered out, return an error.
|
|
if len(queries.Queries) == 0 {
|
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Explain on prepared query '%s' denied due to ACLs", query.ID)
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
reply.Query = *(queries.Queries[0])
|
|
return nil
|
|
}
|
|
|
|
// Execute runs a prepared query and returns the results. This will perform the
|
|
// failover logic if no local results are available. This is typically called as
|
|
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
|
|
func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
|
reply *structs.PreparedQueryExecuteResponse) error {
|
|
if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now())
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
if args.RequireConsistent {
|
|
if err := p.srv.consistentRead(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Try to locate the query.
|
|
state := p.srv.fsm.State()
|
|
_, query, err := state.PreparedQueryResolve(args.QueryIDOrName, args.Agent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if query == nil {
|
|
return ErrQueryNotFound
|
|
}
|
|
|
|
// Execute the query for the local DC.
|
|
if err := p.execute(query, reply, args.Connect); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If they supplied a token with the query, use that, otherwise use the
|
|
// token passed in with the request.
|
|
token := args.QueryOptions.Token
|
|
if query.Token != "" {
|
|
token = query.Token
|
|
}
|
|
if err := p.srv.filterACL(token, &reply.Nodes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO (slackpad) We could add a special case here that will avoid the
|
|
// fail over if we filtered everything due to ACLs. This seems like it
|
|
// might not be worth the code complexity and behavior differences,
|
|
// though, since this is essentially a misconfiguration.
|
|
|
|
// Shuffle the results in case coordinates are not available if they
|
|
// requested an RTT sort.
|
|
reply.Nodes.Shuffle()
|
|
|
|
// Build the query source. This can be provided by the client, or by
|
|
// the prepared query. Client-specified takes priority.
|
|
qs := args.Source
|
|
if qs.Datacenter == "" {
|
|
qs.Datacenter = args.Agent.Datacenter
|
|
}
|
|
if query.Service.Near != "" && qs.Node == "" {
|
|
qs.Node = query.Service.Near
|
|
}
|
|
|
|
// Respect the magic "_agent" flag.
|
|
if qs.Node == "_agent" {
|
|
qs.Node = args.Agent.Node
|
|
} else if qs.Node == "_ip" {
|
|
if args.Source.Ip != "" {
|
|
_, nodes, err := state.Nodes(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
if args.Source.Ip == node.Address {
|
|
qs.Node = node.Node
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
p.srv.logger.Printf("[WARN] Prepared Query using near=_ip requires " +
|
|
"the source IP to be set but none was provided. No distance " +
|
|
"sorting will be done.")
|
|
|
|
}
|
|
|
|
// Either a source IP was given but we couldnt find the associated node
|
|
// or no source ip was given. In both cases we should wipe the Node value
|
|
if qs.Node == "_ip" {
|
|
qs.Node = ""
|
|
}
|
|
}
|
|
|
|
// Perform the distance sort
|
|
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we applied a distance sort, make sure that the node queried for is in
|
|
// position 0, provided the results are from the same datacenter.
|
|
if qs.Node != "" && reply.Datacenter == qs.Datacenter {
|
|
for i, node := range reply.Nodes {
|
|
if node.Node.Node == qs.Node {
|
|
reply.Nodes[0], reply.Nodes[i] = reply.Nodes[i], reply.Nodes[0]
|
|
break
|
|
}
|
|
|
|
// Put a cap on the depth of the search. The local agent should
|
|
// never be further in than this if distance sorting was applied.
|
|
if i == 9 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply the limit if given.
|
|
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
|
reply.Nodes = reply.Nodes[:args.Limit]
|
|
}
|
|
|
|
// In the happy path where we found some healthy nodes we go with that
|
|
// and bail out. Otherwise, we fail over and try remote DCs, as allowed
|
|
// by the query setup.
|
|
if len(reply.Nodes) == 0 {
|
|
wrapper := &queryServerWrapper{p.srv}
|
|
if err := queryFailover(wrapper, query, args, reply); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ExecuteRemote is used when a local node doesn't have any instances of a
|
|
// service available and needs to probe remote DCs. This sends the full query
|
|
// over since the remote side won't have it in its state store, and this doesn't
|
|
// do the failover logic since that's already being run on the originating DC.
|
|
// We don't want things to fan out further than one level.
|
|
func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest,
|
|
reply *structs.PreparedQueryExecuteResponse) error {
|
|
if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now())
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
p.srv.setQueryMeta(&reply.QueryMeta)
|
|
if args.RequireConsistent {
|
|
if err := p.srv.consistentRead(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Run the query locally to see what we can find.
|
|
if err := p.execute(&args.Query, reply, args.Connect); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If they supplied a token with the query, use that, otherwise use the
|
|
// token passed in with the request.
|
|
token := args.QueryOptions.Token
|
|
if args.Query.Token != "" {
|
|
token = args.Query.Token
|
|
}
|
|
if err := p.srv.filterACL(token, &reply.Nodes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We don't bother trying to do an RTT sort here since we are by
|
|
// definition in another DC. We just shuffle to make sure that we
|
|
// balance the load across the results.
|
|
reply.Nodes.Shuffle()
|
|
|
|
// Apply the limit if given.
|
|
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
|
reply.Nodes = reply.Nodes[:args.Limit]
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// execute runs a prepared query in the local DC without any failover. We don't
|
|
// apply any sorting options or ACL checks at this level - it should be done up above.
|
|
func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
|
reply *structs.PreparedQueryExecuteResponse,
|
|
forceConnect bool) error {
|
|
state := p.srv.fsm.State()
|
|
|
|
// If we're requesting Connect-capable services, then switch the
|
|
// lookup to be the Connect function.
|
|
f := state.CheckServiceNodes
|
|
if query.Service.Connect || forceConnect {
|
|
f = state.CheckConnectServiceNodes
|
|
}
|
|
|
|
_, nodes, err := f(nil, query.Service.Service)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Filter out any unhealthy nodes.
|
|
nodes = nodes.FilterIgnore(query.Service.OnlyPassing,
|
|
query.Service.IgnoreCheckIDs)
|
|
|
|
// Apply the node metadata filters, if any.
|
|
if len(query.Service.NodeMeta) > 0 {
|
|
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
|
|
}
|
|
|
|
// Apply the tag filters, if any.
|
|
if len(query.Service.Tags) > 0 {
|
|
nodes = tagFilter(query.Service.Tags, nodes)
|
|
}
|
|
|
|
// Capture the nodes and pass the DNS information through to the reply.
|
|
reply.Service = query.Service.Service
|
|
reply.Nodes = nodes
|
|
reply.DNS = query.DNS
|
|
|
|
// Stamp the result for this datacenter.
|
|
reply.Datacenter = p.srv.config.Datacenter
|
|
|
|
return nil
|
|
}
|
|
|
|
// tagFilter returns a list of nodes who satisfy the given tags. Nodes must have
|
|
// ALL the given tags, and NONE of the forbidden tags (prefixed with !). Note
|
|
// for performance this modifies the original slice.
|
|
func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
|
// Build up lists of required and disallowed tags.
|
|
must, not := make([]string, 0), make([]string, 0)
|
|
for _, tag := range tags {
|
|
tag = strings.ToLower(tag)
|
|
if strings.HasPrefix(tag, "!") {
|
|
tag = tag[1:]
|
|
not = append(not, tag)
|
|
} else {
|
|
must = append(must, tag)
|
|
}
|
|
}
|
|
|
|
n := len(nodes)
|
|
for i := 0; i < n; i++ {
|
|
node := nodes[i]
|
|
|
|
// Index the tags so lookups this way are cheaper.
|
|
index := make(map[string]struct{})
|
|
if node.Service != nil {
|
|
for _, tag := range node.Service.Tags {
|
|
tag = strings.ToLower(tag)
|
|
index[tag] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Bail if any of the required tags are missing.
|
|
for _, tag := range must {
|
|
if _, ok := index[tag]; !ok {
|
|
goto DELETE
|
|
}
|
|
}
|
|
|
|
// Bail if any of the disallowed tags are present.
|
|
for _, tag := range not {
|
|
if _, ok := index[tag]; ok {
|
|
goto DELETE
|
|
}
|
|
}
|
|
|
|
// At this point, the service is ok to leave in the list.
|
|
continue
|
|
|
|
DELETE:
|
|
nodes[i], nodes[n-1] = nodes[n-1], structs.CheckServiceNode{}
|
|
n--
|
|
i--
|
|
}
|
|
return nodes[:n]
|
|
}
|
|
|
|
// nodeMetaFilter returns a list of the nodes who satisfy the given metadata filters. Nodes
|
|
// must have ALL the given tags.
|
|
func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
|
var filtered structs.CheckServiceNodes
|
|
for _, node := range nodes {
|
|
if structs.SatisfiesMetaFilters(node.Node.Meta, filters) {
|
|
filtered = append(filtered, node)
|
|
}
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
// queryServer is a wrapper that makes it easier to test the failover logic.
|
|
type queryServer interface {
|
|
GetLogger() *log.Logger
|
|
GetOtherDatacentersByDistance() ([]string, error)
|
|
ForwardDC(method, dc string, args interface{}, reply interface{}) error
|
|
}
|
|
|
|
// queryServerWrapper applies the queryServer interface to a Server.
|
|
type queryServerWrapper struct {
|
|
srv *Server
|
|
}
|
|
|
|
// GetLogger returns the server's logger.
|
|
func (q *queryServerWrapper) GetLogger() *log.Logger {
|
|
return q.srv.logger
|
|
}
|
|
|
|
// GetOtherDatacentersByDistance calls into the server's fn and filters out the
|
|
// server's own DC.
|
|
func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
|
|
// TODO (slackpad) - We should cache this result since it's expensive to
|
|
// compute.
|
|
dcs, err := q.srv.router.GetDatacentersByDistance()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result []string
|
|
for _, dc := range dcs {
|
|
if dc != q.srv.config.Datacenter {
|
|
result = append(result, dc)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// ForwardDC calls into the server's RPC forwarder.
|
|
func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, reply interface{}) error {
|
|
return q.srv.forwardDC(method, dc, args, reply)
|
|
}
|
|
|
|
// queryFailover runs an algorithm to determine which DCs to try and then calls
|
|
// them to try to locate alternative services.
|
|
func queryFailover(q queryServer, query *structs.PreparedQuery,
|
|
args *structs.PreparedQueryExecuteRequest,
|
|
reply *structs.PreparedQueryExecuteResponse) error {
|
|
|
|
// Pull the list of other DCs. This is sorted by RTT in case the user
|
|
// has selected that.
|
|
nearest, err := q.GetOtherDatacentersByDistance()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// This will help us filter unknown DCs supplied by the user.
|
|
known := make(map[string]struct{})
|
|
for _, dc := range nearest {
|
|
known[dc] = struct{}{}
|
|
}
|
|
|
|
// Build a candidate list of DCs to try, starting with the nearest N
|
|
// from RTTs.
|
|
var dcs []string
|
|
index := make(map[string]struct{})
|
|
if query.Service.Failover.NearestN > 0 {
|
|
for i, dc := range nearest {
|
|
if !(i < query.Service.Failover.NearestN) {
|
|
break
|
|
}
|
|
|
|
dcs = append(dcs, dc)
|
|
index[dc] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Then add any DCs explicitly listed that weren't selected above.
|
|
for _, dc := range query.Service.Failover.Datacenters {
|
|
// This will prevent a log of other log spammage if we do not
|
|
// attempt to talk to datacenters we don't know about.
|
|
if _, ok := known[dc]; !ok {
|
|
q.GetLogger().Printf("[DEBUG] consul.prepared_query: Skipping unknown datacenter '%s' in prepared query", dc)
|
|
continue
|
|
}
|
|
|
|
// This will make sure we don't re-try something that fails
|
|
// from the NearestN list.
|
|
if _, ok := index[dc]; !ok {
|
|
dcs = append(dcs, dc)
|
|
}
|
|
}
|
|
|
|
// Now try the selected DCs in priority order.
|
|
failovers := 0
|
|
for _, dc := range dcs {
|
|
// This keeps track of how many iterations we actually run.
|
|
failovers++
|
|
|
|
// Be super paranoid and set the nodes slice to nil since it's
|
|
// the same slice we used before. We know there's nothing in
|
|
// there, but the underlying msgpack library has a policy of
|
|
// updating the slice when it's non-nil, and that feels dirty.
|
|
// Let's just set it to nil so there's no way to communicate
|
|
// through this slice across successive RPC calls.
|
|
reply.Nodes = nil
|
|
|
|
// Note that we pass along the limit since it can be applied
|
|
// remotely to save bandwidth. We also pass along the consistency
|
|
// mode information and token we were given, so that applies to
|
|
// the remote query as well.
|
|
remote := &structs.PreparedQueryExecuteRemoteRequest{
|
|
Datacenter: dc,
|
|
Query: *query,
|
|
Limit: args.Limit,
|
|
QueryOptions: args.QueryOptions,
|
|
Connect: args.Connect,
|
|
}
|
|
if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
|
|
q.GetLogger().Printf("[WARN] consul.prepared_query: Failed querying for service '%s' in datacenter '%s': %s", query.Service.Service, dc, err)
|
|
continue
|
|
}
|
|
|
|
// We can stop if we found some nodes.
|
|
if len(reply.Nodes) > 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Set this at the end because the response from the remote doesn't have
|
|
// this information.
|
|
reply.Failovers = failovers
|
|
|
|
return nil
|
|
}
|