open-consul/agent/consul/catalog_endpoint.go

1070 lines
33 KiB
Go
Raw Normal View History

package consul
import (
2013-12-24 20:43:34 +00:00
"fmt"
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
"sort"
"strings"
2014-02-20 23:16:26 +00:00
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
pkg refactor command/agent/* -> agent/* command/consul/* -> agent/consul/* command/agent/command{,_test}.go -> command/agent{,_test}.go command/base/command.go -> command/base.go command/base/* -> command/* commands.go -> command/commands.go The script which did the refactor is: ( cd $GOPATH/src/github.com/hashicorp/consul git mv command/agent/command.go command/agent.go git mv command/agent/command_test.go command/agent_test.go git mv command/agent/flag_slice_value{,_test}.go command/ git mv command/agent . git mv command/base/command.go command/base.go git mv command/base/config_util{,_test}.go command/ git mv commands.go command/ git mv consul agent rmdir command/base/ gsed -i -e 's|package agent|package command|' command/agent{,_test}.go gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go gsed -i -e 's|package main|package command|' command/commands.go gsed -i -e 's|base.Command|BaseCommand|' command/commands.go gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go gsed -i -e 's|base\.||' command/commands.go gsed -i -e 's|command\.||' command/commands.go gsed -i -e 's|command|c|' main.go gsed -i -e 's|range Commands|range command.Commands|' main.go gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go gsed -i -e 's|base.Command|BaseCommand|' command/*.go gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go gsed -i -e 's|base\.||' command/*_test.go gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go # fix imports f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f goimports -w $f f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f goimports -w $f goimports -w command/*.go main.go )
2017-06-09 22:28:28 +00:00
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/types"
)
var CatalogCounters = []prometheus.CounterDefinition{
{
Name: []string{"catalog", "service", "query"},
2020-11-16 19:02:11 +00:00
Help: "Increments for each catalog query for the given service.",
},
{
Name: []string{"catalog", "connect", "query"},
2020-11-23 21:06:30 +00:00
Help: "Increments for each connect-based catalog query for the given service.",
},
{
Name: []string{"catalog", "service", "query-tag"},
2020-11-16 19:02:11 +00:00
Help: "Increments for each catalog query for the given service with the given tag.",
},
{
Name: []string{"catalog", "connect", "query-tag"},
2020-11-23 21:06:30 +00:00
Help: "Increments for each connect-based catalog query for the given service with the given tag.",
},
{
Name: []string{"catalog", "service", "query-tags"},
2020-11-16 19:02:11 +00:00
Help: "Increments for each catalog query for the given service with the given tags.",
},
{
Name: []string{"catalog", "connect", "query-tags"},
2020-11-23 21:06:30 +00:00
Help: "Increments for each connect-based catalog query for the given service with the given tags.",
},
{
Name: []string{"catalog", "service", "not-found"},
2020-11-16 19:02:11 +00:00
Help: "Increments for each catalog query where the given service could not be found.",
},
{
Name: []string{"catalog", "connect", "not-found"},
2020-11-23 21:06:30 +00:00
Help: "Increments for each connect-based catalog query where the given service could not be found.",
},
}
var CatalogSummaries = []prometheus.SummaryDefinition{
{
Name: []string{"catalog", "deregister"},
2020-11-16 19:02:11 +00:00
Help: "Measures the time it takes to complete a catalog deregister operation.",
},
{
Name: []string{"catalog", "register"},
2020-11-16 19:02:11 +00:00
Help: "Measures the time it takes to complete a catalog register operation.",
},
}
// Catalog endpoint is used to manipulate the service catalog
type Catalog struct {
2020-06-12 02:05:07 +00:00
srv *Server
logger hclog.Logger
}
func hasPeerNameInRequest(req *structs.RegisterRequest) bool {
if req == nil {
return false
}
// nodes, services, checks
if req.PeerName != structs.DefaultPeerKeyword {
return true
}
if req.Service != nil && req.Service.PeerName != structs.DefaultPeerKeyword {
return true
}
if req.Check != nil && req.Check.PeerName != structs.DefaultPeerKeyword {
return true
}
for _, check := range req.Checks {
if check.PeerName != structs.DefaultPeerKeyword {
return true
}
}
return false
}
// Register a service and/or check(s) in a node, creating the node if it doesn't exist.
// It is valid to pass no service or checks to simply create the node itself.
2013-12-19 20:03:57 +00:00
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
if !c.srv.config.PeeringTestAllowPeerRegistrations && hasPeerNameInRequest(args) {
return fmt.Errorf("cannot register requests with PeerName in them")
}
if done, err := c.srv.ForwardRPC("Catalog.Register", args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now())
// Fetch the ACL token, if any.
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(args.GetEnterpriseMeta(), true); err != nil {
return err
}
// This needs to happen before the other preapply checks as it will fixup some of the
// internal enterprise metas on the services and checks
state := c.srv.fsm.State()
entMeta, err := state.ValidateRegisterRequest(args)
if err != nil {
return err
}
// Verify the args.
if err := nodePreApply(args.Node, string(args.ID)); err != nil {
return err
}
if args.Address == "" && !args.SkipNodeUpdate {
return fmt.Errorf("Must provide address if SkipNodeUpdate is not set")
}
// Handle a service registration.
if args.Service != nil {
if err := servicePreApply(args.Service, authz, args.Service.FillAuthzContext); err != nil {
return err
}
}
// Move the old format single check into the slice, and fixup IDs.
if args.Check != nil {
args.Checks = append(args.Checks, args.Check)
args.Check = nil
}
for _, check := range args.Checks {
if check.Node == "" {
check.Node = args.Node
}
checkPreApply(check)
2019-10-17 18:33:11 +00:00
// Populate check type for cases when a check is registered in the catalog directly
// and not via anti-entropy
if check.Type == "" {
chkType := check.CheckType()
check.Type = chkType.Type()
}
}
// Check the complete register request against the given ACL policy.
_, ns, err := state.NodeServices(nil, args.Node, entMeta, args.PeerName)
if err != nil {
return fmt.Errorf("Node lookup failed: %v", err)
}
if err := vetRegisterWithACL(authz, args, ns); err != nil {
return err
}
_, err = c.srv.raftApply(structs.RegisterRequestType, args)
return err
}
// nodePreApply does the verification of a node before it is applied to Raft.
func nodePreApply(nodeName, nodeID string) error {
if nodeName == "" {
return fmt.Errorf("Must provide node")
}
if nodeID != "" {
if _, err := uuid.ParseUUID(nodeID); err != nil {
return fmt.Errorf("Bad node ID: %v", err)
}
}
return nil
}
func servicePreApply(service *structs.NodeService, authz resolver.Result, authzCtxFill func(*acl.AuthorizerContext)) error {
// Validate the service. This is in addition to the below since
// the above just hasn't been moved over yet. We should move it over
// in time.
if err := service.Validate(); err != nil {
return err
}
// If no service id, but service name, use default
if service.ID == "" && service.Service != "" {
service.ID = service.Service
}
// Verify ServiceName provided if ID.
if service.ID != "" && service.Service == "" {
return fmt.Errorf("Must provide service name (Service.Service) when service ID is provided")
}
// Check the service address here and in the agent endpoint
// since service registration isn't synchronous.
if ipaddr.IsAny(service.Address) {
return fmt.Errorf("Invalid service address")
}
var authzContext acl.AuthorizerContext
authzCtxFill(&authzContext)
// Apply the ACL policy if any. The 'consul' service is excluded
// since it is managed automatically internally (that behavior
// is going away after version 0.8). We check this same policy
// later if version 0.8 is enabled, so we can eventually just
// delete this and do all the ACL checks down there.
if service.Service != structs.ConsulServiceName {
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(service.Service, &authzContext); err != nil {
return err
}
}
// Proxies must have write permission on their destination
if service.Kind == structs.ServiceKindConnectProxy {
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(service.Proxy.DestinationServiceName, &authzContext); err != nil {
return err
}
}
return nil
}
// checkPreApply does the verification of a check before it is applied to Raft.
func checkPreApply(check *structs.HealthCheck) {
if check.CheckID == "" && check.Name != "" {
check.CheckID = types.CheckID(check.Name)
}
}
// vetRegisterWithACL applies the given ACL's policy to the catalog update and
// determines if it is allowed. Since the catalog register request is so
// dynamic, this is a pretty complex algorithm and was worth breaking out of the
// endpoint. The NodeServices record for the node must be supplied, and can be
// nil.
//
// This is a bit racy because we have to check the state store outside of a
// transaction. It's the best we can do because we don't want to flow ACL
// checking down there. The node information doesn't change in practice, so this
// will be fine. If we expose ways to change node addresses in a later version,
// then we should split the catalog API at the node and service level so we can
// address this race better (even then it would be super rare, and would at
// worst let a service update revert a recent node update, so it doesn't open up
// too much abuse).
func vetRegisterWithACL(
authz resolver.Result,
subj *structs.RegisterRequest,
ns *structs.NodeServices,
) error {
var authzContext acl.AuthorizerContext
subj.FillAuthzContext(&authzContext)
// Vet the node info. This allows service updates to re-post the required
// node info for each request without having to have node "write"
// privileges.
needsNode := ns == nil || subj.ChangesNode(ns.Node)
if needsNode {
if err := authz.ToAllowAuthorizer().NodeWriteAllowed(subj.Node, &authzContext); err != nil {
return err
}
}
// Vet the service change. This includes making sure they can register
// the given service, and that we can write to any existing service that
// is being modified by id (if any).
if subj.Service != nil {
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(subj.Service.Service, &authzContext); err != nil {
return err
}
if ns != nil {
other, ok := ns.Services[subj.Service.ID]
if ok {
// This is effectively a delete, so we DO NOT apply the
// sentinel scope to the service we are overwriting, just
// the regular ACL policy.
var secondaryCtx acl.AuthorizerContext
other.FillAuthzContext(&secondaryCtx)
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(other.Service, &secondaryCtx); err != nil {
return acl.ErrPermissionDenied
}
}
}
}
// Make sure that the member was flattened before we got there. This
// keeps us from having to verify this check as well.
if subj.Check != nil {
return fmt.Errorf("check member must be nil")
}
// Vet the checks. Node-level checks require node write, and
// service-level checks require service write.
for _, check := range subj.Checks {
// Make sure that the node matches - we don't allow you to mix
// checks from other nodes because we'd have to pull a bunch
// more state store data to check this. If ACLs are enabled then
// we simply require them to match in a given request. There's a
// note in state_store.go to ban this down there in Consul 0.8,
// but it's good to leave this here because it's required for
// correctness wrt. ACLs.
if !strings.EqualFold(check.Node, subj.Node) {
return fmt.Errorf("Node '%s' for check '%s' doesn't match register request node '%s'",
check.Node, check.CheckID, subj.Node)
}
// Node-level check.
if check.ServiceID == "" {
if err := authz.ToAllowAuthorizer().NodeWriteAllowed(subj.Node, &authzContext); err != nil {
return err
}
continue
}
// Service-level check, check the common case where it
// matches the service part of this request, which has
// already been vetted above, and might be being registered
// along with its checks.
if subj.Service != nil && subj.Service.ID == check.ServiceID {
continue
}
// Service-level check for some other service. Make sure they've
// got write permissions for that service.
if ns == nil {
return fmt.Errorf("Unknown service ID '%s' for check ID '%s'", check.ServiceID, check.CheckID)
}
other, ok := ns.Services[check.ServiceID]
if !ok {
return fmt.Errorf("Unknown service ID '%s' for check ID '%s'", check.ServiceID, check.CheckID)
}
// We are only adding a check here, so we don't add the scope,
// since the sentinel policy doesn't apply to adding checks at
// this time.
var secondaryCtx acl.AuthorizerContext
other.FillAuthzContext(&secondaryCtx)
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(other.Service, &secondaryCtx); err != nil {
return err
}
}
return nil
}
// Deregister a service or check in a node, or the entire node itself.
//
// If a ServiceID is provided in the request, any associated Checks
// with that service are also deregistered.
//
// If a ServiceID or CheckID is not provided in the request, the entire
// node is deregistered.
2013-12-19 20:03:57 +00:00
func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error {
if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, reply); done {
2013-12-11 23:34:10 +00:00
return err
}
defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now())
2013-12-11 23:34:10 +00:00
2013-12-24 20:43:34 +00:00
// Verify the args
if args.Node == "" {
return fmt.Errorf("Must provide node")
}
// Fetch the ACL token, if any.
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
2013-12-11 23:34:10 +00:00
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, true); err != nil {
return err
}
// Check the complete deregister request against the given ACL policy.
state := c.srv.fsm.State()
var ns *structs.NodeService
if args.ServiceID != "" {
_, ns, err = state.NodeService(nil, args.Node, args.ServiceID, &args.EnterpriseMeta, args.PeerName)
if err != nil {
return fmt.Errorf("Service lookup failed: %v", err)
}
}
var nc *structs.HealthCheck
if args.CheckID != "" {
_, nc, err = state.NodeCheck(args.Node, args.CheckID, &args.EnterpriseMeta, args.PeerName)
if err != nil {
return fmt.Errorf("Check lookup failed: %v", err)
}
}
if err := vetDeregisterWithACL(authz, args, ns, nc); err != nil {
return err
}
_, err = c.srv.raftApply(structs.DeregisterRequestType, args)
return err
}
2013-12-12 18:35:50 +00:00
// vetDeregisterWithACL applies the given ACL's policy to the catalog update and
// determines if it is allowed. Since the catalog deregister request is so
// dynamic, this is a pretty complex algorithm and was worth breaking out of the
// endpoint. The NodeService for the referenced service must be supplied, and can
// be nil; similar for the HealthCheck for the referenced health check.
func vetDeregisterWithACL(
authz resolver.Result,
subj *structs.DeregisterRequest,
ns *structs.NodeService,
nc *structs.HealthCheck,
) error {
// We don't apply sentinel in this path, since at this time sentinel
// only applies to create and update operations.
var authzContext acl.AuthorizerContext
// fill with the defaults for use with the NodeWrite check
subj.FillAuthzContext(&authzContext)
// Allow service deregistration if the token has write permission for the node.
// This accounts for cases where the agent no longer has a token with write permission
// on the service to deregister it.
nodeWriteErr := authz.ToAllowAuthorizer().NodeWriteAllowed(subj.Node, &authzContext)
if nodeWriteErr == nil {
return nil
}
// This order must match the code in applyDeregister() in
// fsm/commands_oss.go since it also evaluates things in this order,
// and will ignore fields based on this precedence. This lets us also
// ignore them from an ACL perspective.
if subj.ServiceID != "" {
if ns == nil {
return fmt.Errorf("Unknown service ID '%s'", subj.ServiceID)
}
ns.FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(ns.Service, &authzContext); err != nil {
return err
}
} else if subj.CheckID != "" {
if nc == nil {
return fmt.Errorf("Unknown check ID '%s'", subj.CheckID)
}
nc.FillAuthzContext(&authzContext)
if nc.ServiceID != "" {
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(nc.ServiceName, &authzContext); err != nil {
return err
}
} else {
if err := authz.ToAllowAuthorizer().NodeWriteAllowed(subj.Node, &authzContext); err != nil {
return err
}
}
} else {
// Since NodeWrite is not given - otherwise the earlier check
// would've returned already - we can deny here.
return nodeWriteErr
}
return nil
}
2013-12-12 18:35:50 +00:00
// ListDatacenters is used to query for the list of known datacenters
func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]string) error {
dcs, err := c.srv.router.GetDatacentersByDistance()
if err != nil {
return err
}
if len(dcs) == 0 { // no WAN federation, so return the local data center name
dcs = []string{c.srv.config.Datacenter}
}
2013-12-12 18:35:50 +00:00
*reply = dcs
return nil
}
2013-12-12 18:48:36 +00:00
// ListNodes is used to query the nodes in a DC.
func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error {
if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, reply); done {
2013-12-12 18:48:36 +00:00
return err
}
filter, err := bexpr.CreateFilter(args.Filter, nil, reply.Nodes)
if err != nil {
return err
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
2017-04-21 00:46:29 +00:00
func(ws memdb.WatchSet, state *state.Store) error {
var err error
if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName)
} else {
reply.Index, reply.Nodes, err = state.Nodes(ws, &args.EnterpriseMeta, args.PeerName)
}
if err != nil {
return err
}
if isUnmodified(args.QueryOptions, reply.Index) {
reply.QueryMeta.NotModified = true
reply.Nodes = nil
return nil
}
raw, err := filter.Execute(reply.Nodes)
if err != nil {
return err
}
reply.Nodes = raw.(structs.Nodes)
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}
return c.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
})
2013-12-12 18:48:36 +00:00
}
2013-12-12 19:07:14 +00:00
func isUnmodified(opts structs.QueryOptions, index uint64) bool {
return opts.AllowNotModifiedResponse && opts.MinQueryIndex > 0 && opts.MinQueryIndex == index
}
// ListServices is used to query the services in a DC.
// Returns services as a map of service names to available tags.
func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error {
if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, reply); done {
return err
}
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
2013-12-12 19:07:14 +00:00
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
// Set reply enterprise metadata after resolving and validating the token so
// that we can properly infer metadata from the token.
reply.EnterpriseMeta = args.EnterpriseMeta
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
2017-04-21 00:46:29 +00:00
func(ws memdb.WatchSet, state *state.Store) error {
var err error
if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName)
} else {
reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
}
if err != nil {
return err
}
if isUnmodified(args.QueryOptions, reply.Index) {
reply.Services = nil
reply.QueryMeta.NotModified = true
return nil
}
c.srv.filterACLWithAuthorizer(authz, reply)
return nil
})
2013-12-12 19:07:14 +00:00
}
2013-12-12 19:37:19 +00:00
// ServiceList is used to query the services in a DC.
// Returns services as a list of ServiceNames.
func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error {
if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, reply); done {
return err
}
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, services, err := state.ServiceList(ws, &args.EnterpriseMeta, args.PeerName)
if err != nil {
return err
}
reply.Index, reply.Services = index, services
c.srv.filterACLWithAuthorizer(authz, reply)
return nil
})
}
// ServiceNodes returns all the nodes registered as part of a service.
func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error {
if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, reply); done {
2013-12-12 19:37:19 +00:00
return err
}
2013-12-24 20:43:34 +00:00
// Verify the arguments
if args.ServiceName == "" && args.ServiceAddress == "" {
2013-12-24 20:43:34 +00:00
return fmt.Errorf("Must provide service name")
}
// Determine the function we'll call
var f func(memdb.WatchSet, *state.Store) (uint64, structs.ServiceNodes, error)
switch {
case args.Connect:
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
return s.ConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
default:
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
if args.ServiceAddress != "" {
return s.ServiceAddressNodes(ws, args.ServiceAddress, &args.EnterpriseMeta, args.PeerName)
}
if args.TagFilter {
tags := args.ServiceTags
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
// Agents < v1.3.0 populate the ServiceTag field. In this case,
// use ServiceTag instead of the ServiceTags field.
if args.ServiceTag != "" {
tags = []string{args.ServiceTag}
}
return s.ServiceTagNodes(ws, args.ServiceName, tags, &args.EnterpriseMeta, args.PeerName)
}
return s.ServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
}
var authzContext acl.AuthorizerContext
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
// If we're doing a connect query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect {
// TODO(acl-error-enhancements) can this be improved? What happens if we returned an error here?
// Is this similar to filters where we might want to return a hint?
2021-07-30 18:28:19 +00:00
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// Just return nil, which will return an empty response (tested)
return nil
}
}
filter, err := bexpr.CreateFilter(args.Filter, nil, reply.ServiceNodes)
if err != nil {
return err
}
var (
priorMergeHash uint64
ranMergeOnce bool
)
err = c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
2017-04-21 00:46:29 +00:00
func(ws memdb.WatchSet, state *state.Store) error {
index, services, err := f(ws, state)
if err != nil {
return err
}
mergedServices := services
if args.MergeCentralConfig {
var mergedServiceNodes structs.ServiceNodes
for _, sn := range services {
mergedsn := sn
ns := sn.ToNodeService()
if ns.IsSidecarProxy() || ns.IsGateway() {
cfgIndex, mergedns, err := mergeNodeServiceWithCentralConfig(ws, state, args, ns, c.logger)
if err != nil {
return err
}
if cfgIndex > index {
index = cfgIndex
}
mergedsn = mergedns.ToServiceNode(sn.Node)
}
mergedServiceNodes = append(mergedServiceNodes, mergedsn)
}
if len(mergedServiceNodes) > 0 {
mergedServices = mergedServiceNodes
}
// Generate a hash of the mergedServices driving this response.
// Use it to determine if the response is identical to a prior wakeup.
newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranMergeOnce && priorMergeHash == newMergeHash {
// the below assignment is not required as the if condition already validates equality,
// but makes it more clear that prior value is being reset to the new hash on each run.
priorMergeHash = newMergeHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which is desirable
return errNotChanged
} else {
priorMergeHash = newMergeHash
ranMergeOnce = true
}
}
reply.Index, reply.ServiceNodes = index, mergedServices
if len(args.NodeMetaFilters) > 0 {
var filtered structs.ServiceNodes
for _, service := range mergedServices {
if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) {
filtered = append(filtered, service)
}
}
reply.ServiceNodes = filtered
}
// This is safe to do even when the filter is nil - its just a no-op then
raw, err := filter.Execute(reply.ServiceNodes)
if err != nil {
return err
}
reply.ServiceNodes = raw.(structs.ServiceNodes)
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}
return c.srv.sortNodesByDistanceFrom(args.Source, reply.ServiceNodes)
})
// Provide some metrics
if err == nil {
// For metrics, we separate Connect-based lookups from non-Connect
key := "service"
if args.Connect {
key = "connect"
}
metrics.IncrCounterWithLabels([]string{"catalog", key, "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(args.ServiceTags) > 0 {
New ACLs (#4791) This PR is almost a complete rewrite of the ACL system within Consul. It brings the features more in line with other HashiCorp products. Obviously there is quite a bit left to do here but most of it is related docs, testing and finishing the last few commands in the CLI. I will update the PR description and check off the todos as I finish them over the next few days/week. Description At a high level this PR is mainly to split ACL tokens from Policies and to split the concepts of Authorization from Identities. A lot of this PR is mostly just to support CRUD operations on ACLTokens and ACLPolicies. These in and of themselves are not particularly interesting. The bigger conceptual changes are in how tokens get resolved, how backwards compatibility is handled and the separation of policy from identity which could lead the way to allowing for alternative identity providers. On the surface and with a new cluster the ACL system will look very similar to that of Nomads. Both have tokens and policies. Both have local tokens. The ACL management APIs for both are very similar. I even ripped off Nomad's ACL bootstrap resetting procedure. There are a few key differences though. Nomad requires token and policy replication where Consul only requires policy replication with token replication being opt-in. In Consul local tokens only work with token replication being enabled though. All policies in Nomad are globally applicable. In Consul all policies are stored and replicated globally but can be scoped to a subset of the datacenters. This allows for more granular access management. Unlike Nomad, Consul has legacy baggage in the form of the original ACL system. The ramifications of this are: A server running the new system must still support other clients using the legacy system. A client running the new system must be able to use the legacy RPCs when the servers in its datacenter are running the legacy system. The primary ACL DC's servers running in legacy mode needs to be a gate that keeps everything else in the entire multi-DC cluster running in legacy mode. So not only does this PR implement the new ACL system but has a legacy mode built in for when the cluster isn't ready for new ACLs. Also detecting that new ACLs can be used is automatic and requires no configuration on the part of administrators. This process is detailed more in the "Transitioning from Legacy to New ACL Mode" section below.
2018-10-19 16:04:07 +00:00
// Sort tags so that the metric is the same even if the request
// tags are in a different order
sort.Strings(args.ServiceTags)
// Build metric labels
labels := []metrics.Label{{Name: "service", Value: args.ServiceName}}
for _, tag := range args.ServiceTags {
labels = append(labels, metrics.Label{Name: "tag", Value: tag})
}
metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tags"}, 1, labels)
}
if len(reply.ServiceNodes) == 0 {
metrics.IncrCounterWithLabels([]string{"catalog", key, "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
2013-12-12 19:37:19 +00:00
}
2013-12-12 19:46:25 +00:00
// NodeServices returns all the services registered as part of a node.
// Returns NodeServices as a map of service IDs to services.
func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error {
if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, reply); done {
2013-12-12 19:46:25 +00:00
return err
}
2013-12-24 20:43:34 +00:00
// Verify the arguments
if args.Node == "" {
return fmt.Errorf("Must provide node")
}
var filterType map[string]*structs.NodeService
filter, err := bexpr.CreateFilter(args.Filter, nil, filterType)
if err != nil {
return err
}
_, err = c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
2017-04-21 00:46:29 +00:00
func(ws memdb.WatchSet, state *state.Store) error {
index, services, err := state.NodeServices(ws, args.Node, &args.EnterpriseMeta, args.PeerName)
if err != nil {
return err
}
reply.Index, reply.NodeServices = index, services
if reply.NodeServices != nil {
raw, err := filter.Execute(reply.NodeServices.Services)
if err != nil {
return err
}
reply.NodeServices.Services = raw.(map[string]*structs.NodeService)
}
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
2013-12-12 19:46:25 +00:00
}
// NodeServiceList returns all the services registered as part of a node.
// Returns NodeServices as a list of services.
func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServiceList) error {
if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, reply); done {
return err
}
// Verify the arguments
if args.Node == "" {
return fmt.Errorf("Must provide node")
}
var filterType []*structs.NodeService
filter, err := bexpr.CreateFilter(args.Filter, nil, filterType)
if err != nil {
return err
}
_, err = c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
var (
priorMergeHash uint64
ranMergeOnce bool
)
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, services, err := state.NodeServiceList(ws, args.Node, &args.EnterpriseMeta, args.PeerName)
if err != nil {
return err
}
mergedServices := services
var cfgIndex uint64
if services != nil && args.MergeCentralConfig {
var mergedNodeServices []*structs.NodeService
for _, ns := range services.Services {
mergedns := ns
if ns.IsSidecarProxy() || ns.IsGateway() {
serviceSpecificReq := structs.ServiceSpecificRequest{
Datacenter: args.Datacenter,
QueryOptions: args.QueryOptions,
}
cfgIndex, mergedns, err = mergeNodeServiceWithCentralConfig(ws, state, &serviceSpecificReq, ns, c.logger)
if err != nil {
return err
}
if cfgIndex > index {
index = cfgIndex
}
}
mergedNodeServices = append(mergedNodeServices, mergedns)
}
if len(mergedNodeServices) > 0 {
mergedServices.Services = mergedNodeServices
}
// Generate a hash of the mergedServices driving this response.
// Use it to determine if the response is identical to a prior wakeup.
newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranMergeOnce && priorMergeHash == newMergeHash {
// the below assignment is not required as the if condition already validates equality,
// but makes it more clear that prior value is being reset to the new hash on each run.
priorMergeHash = newMergeHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which is desirable
return errNotChanged
} else {
priorMergeHash = newMergeHash
ranMergeOnce = true
}
}
reply.Index = index
if mergedServices != nil {
reply.NodeServices = *mergedServices
raw, err := filter.Execute(reply.NodeServices.Services)
if err != nil {
return err
}
reply.NodeServices.Services = raw.([]*structs.NodeService)
}
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission.
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}
2020-06-12 02:05:07 +00:00
func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error {
if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, reply); done {
2020-06-12 02:05:07 +00:00
return err
}
var authzContext acl.AuthorizerContext
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(args.ServiceName, &authzContext); err != nil {
return err
2020-06-12 02:05:07 +00:00
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var services structs.GatewayServices
supportedGateways := []string{structs.IngressGateway, structs.TerminatingGateway}
var found bool
for _, kind := range supportedGateways {
// We only use this call to validate the RPC call, don't add the watch set
_, entry, err := state.ConfigEntry(nil, kind, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if entry != nil {
found = true
break
2020-06-12 02:05:07 +00:00
}
}
// We log a warning here to indicate that there is a potential
// misconfiguration. We explicitly do NOT return an error because this
// can occur in the course of normal operation by deleting a
// configuration entry or starting the proxy before registering the
// config entry.
if !found {
c.logger.Warn("no terminating-gateway or ingress-gateway associated with this gateway",
"gateway", args.ServiceName,
)
}
index, services, err = state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
reply.Index, reply.Services = index, services
2020-06-12 02:05:07 +00:00
if err := c.srv.filterACL(args.Token, reply); err != nil {
2020-06-12 02:05:07 +00:00
return err
}
return nil
})
}
func (c *Catalog) VirtualIPForService(args *structs.ServiceSpecificRequest, reply *string) error {
if done, err := c.srv.ForwardRPC("Catalog.VirtualIPForService", args, reply); done {
return err
}
var authzContext acl.AuthorizerContext
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(args.ServiceName, &authzContext); err != nil {
return err
}
state := c.srv.fsm.State()
psn := structs.PeeredServiceName{Peer: args.PeerName, ServiceName: structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta)}
*reply, err = state.VirtualIPForService(psn)
return err
}