package consul import ( "fmt" "log" "os" "time" "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sentinel" "github.com/hashicorp/golang-lru" ) // These must be kept in sync with the constants in command/agent/acl.go. const ( // anonymousToken is the token ID we re-write to if there is no token ID // provided. anonymousToken = "anonymous" // redactedToken is shown in structures with embedded tokens when they // are not allowed to be displayed. redactedToken = "" // Maximum number of cached ACL entries. aclCacheSize = 10 * 1024 ) // aclCacheEntry is used to cache non-authoritative ACLs // If non-authoritative, then we must respect a TTL type aclCacheEntry struct { ACL acl.ACL Expires time.Time ETag string } // aclLocalFault is used by the authoritative ACL cache to fault in the rules // for an ACL if we take a miss. This goes directly to the state store, so it // assumes its running in the ACL datacenter, or in a non-ACL datacenter when // using its replicated ACLs during an outage. func (s *Server) aclLocalFault(id string) (string, string, error) { defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now()) defer metrics.MeasureSince([]string{"acl", "fault"}, time.Now()) // Query the state store. state := s.fsm.State() _, rule, err := state.ACLGet(nil, id) if err != nil { return "", "", err } if rule == nil { return "", "", acl.ErrNotFound } // Management tokens have no policy and inherit from the 'manage' root // policy. if rule.Type == structs.ACLTypeManagement { return "manage", "", nil } // Otherwise use the default policy. return s.config.ACLDefaultPolicy, rule.Rules, nil } // resolveToken is the primary interface used by ACL-checkers (such as an // endpoint handling a request) to resolve a token. If ACLs aren't enabled // then this will return a nil token, otherwise it will attempt to use local // cache and ultimately the ACL datacenter to get the policy associated with the // token. func (s *Server) resolveToken(id string) (acl.ACL, error) { // Check if there is no ACL datacenter (ACLs disabled) authDC := s.config.ACLDatacenter if len(authDC) == 0 { return nil, nil } defer metrics.MeasureSince([]string{"consul", "acl", "resolveToken"}, time.Now()) defer metrics.MeasureSince([]string{"acl", "resolveToken"}, time.Now()) // Handle the anonymous token if len(id) == 0 { id = anonymousToken } else if acl.RootACL(id) != nil { return nil, acl.ErrRootDenied } // Check if we are the ACL datacenter and the leader, use the // authoritative cache if s.config.Datacenter == authDC && s.IsLeader() { return s.aclAuthCache.GetACL(id) } // Use our non-authoritative cache return s.aclCache.lookupACL(id, authDC) } // rpcFn is used to make an RPC call to the client or server. type rpcFn func(string, interface{}, interface{}) error // aclCache is used to cache ACLs and policies. type aclCache struct { config *Config logger *log.Logger // acls is a non-authoritative ACL cache. acls *lru.TwoQueueCache // sentinel is the code engine (can be nil). sentinel sentinel.Evaluator // aclPolicyCache is a non-authoritative policy cache. policies *lru.TwoQueueCache // rpc is a function used to talk to the client/server. rpc rpcFn // local is a function used to look for an ACL locally if replication is // enabled. This will be nil if replication isn't enabled. local acl.FaultFunc } // newACLCache returns a new non-authoritative cache for ACLs. This is used for // performance, and is used inside the ACL datacenter on non-leader servers, and // outside the ACL datacenter everywhere. func newACLCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFunc, sentinel sentinel.Evaluator) (*aclCache, error) { var err error cache := &aclCache{ config: conf, logger: logger, rpc: rpc, local: local, sentinel: sentinel, } // Initialize the non-authoritative ACL cache cache.acls, err = lru.New2Q(aclCacheSize) if err != nil { return nil, fmt.Errorf("Failed to create ACL cache: %v", err) } // Initialize the ACL policy cache cache.policies, err = lru.New2Q(aclCacheSize) if err != nil { return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err) } return cache, nil } // lookupACL is used when we are non-authoritative, and need to resolve an ACL. func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { // Check the cache for the ACL. var cached *aclCacheEntry raw, ok := c.acls.Get(id) if ok { cached = raw.(*aclCacheEntry) } // Check for live cache. if cached != nil && time.Now().Before(cached.Expires) { metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1) metrics.IncrCounter([]string{"acl", "cache_hit"}, 1) return cached.ACL, nil } metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1) metrics.IncrCounter([]string{"acl", "cache_miss"}, 1) // Attempt to refresh the policy from the ACL datacenter via an RPC. args := structs.ACLPolicyRequest{ Datacenter: authDC, ACL: id, } if cached != nil { args.ETag = cached.ETag } var reply structs.ACLPolicy err := c.rpc("ACL.GetPolicy", &args, &reply) if err == nil { return c.useACLPolicy(id, authDC, cached, &reply) } // Check for not-found, which will cause us to bail immediately. For any // other error we report it in the logs but can continue. if acl.IsErrNotFound(err) { return nil, acl.ErrNotFound } c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) // TODO (slackpad) - We could do a similar thing *within* the ACL // datacenter if the leader isn't available. We have a local state // store of the ACLs, so by populating the local member in this cache, // it would fall back to the state store if there was a leader loss and // the extend-cache policy was true. This feels subtle to explain and // configure, and leader blips should be paved over by cache already, so // we won't do this for now but should consider for the future. This is // a lot different than the replication story where you might be cut off // from the ACL datacenter for an extended period of time and need to // carry on operating with the full set of ACLs as they were known // before the partition. // At this point we might have an expired cache entry and we know that // there was a problem getting the ACL from the ACL datacenter. If a // local ACL fault function is registered to query replicated ACL data, // and the user's policy allows it, we will try locally before we give // up. if c.local != nil && c.config.ACLDownPolicy == "extend-cache" { parent, rules, err := c.local(id) if err != nil { // We don't make an exception here for ACLs that aren't // found locally. It seems more robust to use an expired // cached entry (if we have one) rather than ignore it // for the case that replication was a bit behind and // didn't have the ACL yet. c.logger.Printf("[DEBUG] consul.acl: Failed to get policy from replicated ACLs: %v", err) goto ACL_DOWN } policy, err := acl.Parse(rules, c.sentinel) if err != nil { c.logger.Printf("[DEBUG] consul.acl: Failed to parse policy for replicated ACL: %v", err) goto ACL_DOWN } policy.ID = acl.RuleID(rules) // Fake up an ACL datacenter reply and inject it into the cache. // Note we use the local TTL here, so this'll be used for that // amount of time even once the ACL datacenter becomes available. metrics.IncrCounter([]string{"consul", "acl", "replication_hit"}, 1) metrics.IncrCounter([]string{"acl", "replication_hit"}, 1) reply.ETag = makeACLETag(parent, policy) reply.TTL = c.config.ACLTTL reply.Parent = parent reply.Policy = policy return c.useACLPolicy(id, authDC, cached, &reply) } ACL_DOWN: // Unable to refresh, apply the down policy. switch c.config.ACLDownPolicy { case "allow": return acl.AllowAll(), nil case "extend-cache": if cached != nil { return cached.ACL, nil } fallthrough default: return acl.DenyAll(), nil } } // useACLPolicy handles an ACLPolicy response func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) { // Check if we can used the cached policy if cached != nil && cached.ETag == p.ETag { if p.TTL > 0 { // TODO (slackpad) - This seems like it's an unsafe // write. cached.Expires = time.Now().Add(p.TTL) } return cached.ACL, nil } // Check for a cached compiled policy var compiled acl.ACL raw, ok := c.policies.Get(p.ETag) if ok { compiled = raw.(acl.ACL) } else { // Resolve the parent policy parent := acl.RootACL(p.Parent) if parent == nil { var err error parent, err = c.lookupACL(p.Parent, authDC) if err != nil { return nil, err } } // Compile the ACL acl, err := acl.New(parent, p.Policy, c.sentinel) if err != nil { return nil, err } // Cache the policy c.policies.Add(p.ETag, acl) compiled = acl } // Cache the ACL cached = &aclCacheEntry{ ACL: compiled, ETag: p.ETag, } if p.TTL > 0 { cached.Expires = time.Now().Add(p.TTL) } c.acls.Add(id, cached) return compiled, nil } // aclFilter is used to filter results from our state store based on ACL rules // configured for the provided token. type aclFilter struct { acl acl.ACL logger *log.Logger enforceVersion8 bool } // newACLFilter constructs a new aclFilter. func newACLFilter(acl acl.ACL, logger *log.Logger, enforceVersion8 bool) *aclFilter { if logger == nil { logger = log.New(os.Stderr, "", log.LstdFlags) } return &aclFilter{ acl: acl, logger: logger, enforceVersion8: enforceVersion8, } } // allowNode is used to determine if a node is accessible for an ACL. func (f *aclFilter) allowNode(node string) bool { if !f.enforceVersion8 { return true } return f.acl.NodeRead(node) } // allowService is used to determine if a service is accessible for an ACL. func (f *aclFilter) allowService(service string) bool { if service == "" { return true } if !f.enforceVersion8 && service == structs.ConsulServiceID { return true } return f.acl.ServiceRead(service) } // allowSession is used to determine if a session for a node is accessible for // an ACL. func (f *aclFilter) allowSession(node string) bool { if !f.enforceVersion8 { return true } return f.acl.SessionRead(node) } // filterHealthChecks is used to filter a set of health checks down based on // the configured ACL rules for a token. func (f *aclFilter) filterHealthChecks(checks *structs.HealthChecks) { hc := *checks for i := 0; i < len(hc); i++ { check := hc[i] if f.allowNode(check.Node) && f.allowService(check.ServiceName) { continue } f.logger.Printf("[DEBUG] consul: dropping check %q from result due to ACLs", check.CheckID) hc = append(hc[:i], hc[i+1:]...) i-- } *checks = hc } // filterServices is used to filter a set of services based on ACLs. func (f *aclFilter) filterServices(services structs.Services) { for svc := range services { if f.allowService(svc) { continue } f.logger.Printf("[DEBUG] consul: dropping service %q from result due to ACLs", svc) delete(services, svc) } } // filterServiceNodes is used to filter a set of nodes for a given service // based on the configured ACL rules. func (f *aclFilter) filterServiceNodes(nodes *structs.ServiceNodes) { sn := *nodes for i := 0; i < len(sn); i++ { node := sn[i] if f.allowNode(node.Node) && f.allowService(node.ServiceName) { continue } f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node.Node) sn = append(sn[:i], sn[i+1:]...) i-- } *nodes = sn } // filterNodeServices is used to filter services on a given node base on ACLs. func (f *aclFilter) filterNodeServices(services **structs.NodeServices) { if *services == nil { return } if !f.allowNode((*services).Node.Node) { *services = nil return } for svc := range (*services).Services { if f.allowService(svc) { continue } f.logger.Printf("[DEBUG] consul: dropping service %q from result due to ACLs", svc) delete((*services).Services, svc) } } // filterCheckServiceNodes is used to filter nodes based on ACL rules. func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) { csn := *nodes for i := 0; i < len(csn); i++ { node := csn[i] if f.allowNode(node.Node.Node) && f.allowService(node.Service.Service) { continue } f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node.Node.Node) csn = append(csn[:i], csn[i+1:]...) i-- } *nodes = csn } // filterSessions is used to filter a set of sessions based on ACLs. func (f *aclFilter) filterSessions(sessions *structs.Sessions) { s := *sessions for i := 0; i < len(s); i++ { session := s[i] if f.allowSession(session.Node) { continue } f.logger.Printf("[DEBUG] consul: dropping session %q from result due to ACLs", session.ID) s = append(s[:i], s[i+1:]...) i-- } *sessions = s } // filterCoordinates is used to filter nodes in a coordinate dump based on ACL // rules. func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) { c := *coords for i := 0; i < len(c); i++ { node := c[i].Node if f.allowNode(node) { continue } f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node) c = append(c[:i], c[i+1:]...) i-- } *coords = c } // filterNodeDump is used to filter through all parts of a node dump and // remove elements the provided ACL token cannot access. func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) { nd := *dump for i := 0; i < len(nd); i++ { info := nd[i] // Filter nodes if node := info.Node; !f.allowNode(node) { f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node) nd = append(nd[:i], nd[i+1:]...) i-- continue } // Filter services for j := 0; j < len(info.Services); j++ { svc := info.Services[j].Service if f.allowService(svc) { continue } f.logger.Printf("[DEBUG] consul: dropping service %q from result due to ACLs", svc) info.Services = append(info.Services[:j], info.Services[j+1:]...) j-- } // Filter checks for j := 0; j < len(info.Checks); j++ { chk := info.Checks[j] if f.allowService(chk.ServiceName) { continue } f.logger.Printf("[DEBUG] consul: dropping check %q from result due to ACLs", chk.CheckID) info.Checks = append(info.Checks[:j], info.Checks[j+1:]...) j-- } } *dump = nd } // filterNodes is used to filter through all parts of a node list and remove // elements the provided ACL token cannot access. func (f *aclFilter) filterNodes(nodes *structs.Nodes) { n := *nodes for i := 0; i < len(n); i++ { node := n[i].Node if f.allowNode(node) { continue } f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node) n = append(n[:i], n[i+1:]...) i-- } *nodes = n } // redactPreparedQueryTokens will redact any tokens unless the client has a // management token. This eases the transition to delegated authority over // prepared queries, since it was easy to capture management tokens in Consul // 0.6.3 and earlier, and we don't want to willy-nilly show those. This does // have the limitation of preventing delegated non-management users from seeing // captured tokens, but they can at least see whether or not a token is set. func (f *aclFilter) redactPreparedQueryTokens(query **structs.PreparedQuery) { // Management tokens can see everything with no filtering. if f.acl.ACLList() { return } // Let the user see if there's a blank token, otherwise we need // to redact it, since we know they don't have a management // token. if (*query).Token != "" { // Redact the token, using a copy of the query structure // since we could be pointed at a live instance from the // state store so it's not safe to modify it. Note that // this clone will still point to things like underlying // arrays in the original, but for modifying just the // token it will be safe to use. clone := *(*query) clone.Token = redactedToken *query = &clone } } // filterPreparedQueries is used to filter prepared queries based on ACL rules. // We prune entries the user doesn't have access to, and we redact any tokens // if the user doesn't have a management token. func (f *aclFilter) filterPreparedQueries(queries *structs.PreparedQueries) { // Management tokens can see everything with no filtering. if f.acl.ACLList() { return } // Otherwise, we need to see what the token has access to. ret := make(structs.PreparedQueries, 0, len(*queries)) for _, query := range *queries { // If no prefix ACL applies to this query then filter it, since // we know at this point the user doesn't have a management // token, otherwise see what the policy says. prefix, ok := query.GetACLPrefix() if !ok || !f.acl.PreparedQueryRead(prefix) { f.logger.Printf("[DEBUG] consul: dropping prepared query %q from result due to ACLs", query.ID) continue } // Redact any tokens if necessary. We make a copy of just the // pointer so we don't mess with the caller's slice. final := query f.redactPreparedQueryTokens(&final) ret = append(ret, final) } *queries = ret } // filterACL is used to filter results from our service catalog based on the // rules configured for the provided token. func (s *Server) filterACL(token string, subj interface{}) error { // Get the ACL from the token acl, err := s.resolveToken(token) if err != nil { return err } // Fast path if ACLs are not enabled if acl == nil { return nil } // Create the filter filt := newACLFilter(acl, s.logger, s.config.ACLEnforceVersion8) switch v := subj.(type) { case *structs.CheckServiceNodes: filt.filterCheckServiceNodes(v) case *structs.IndexedCheckServiceNodes: filt.filterCheckServiceNodes(&v.Nodes) case *structs.IndexedCoordinates: filt.filterCoordinates(&v.Coordinates) case *structs.IndexedHealthChecks: filt.filterHealthChecks(&v.HealthChecks) case *structs.IndexedNodeDump: filt.filterNodeDump(&v.Dump) case *structs.IndexedNodes: filt.filterNodes(&v.Nodes) case *structs.IndexedNodeServices: filt.filterNodeServices(&v.NodeServices) case *structs.IndexedServiceNodes: filt.filterServiceNodes(&v.ServiceNodes) case *structs.IndexedServices: filt.filterServices(v.Services) case *structs.IndexedSessions: filt.filterSessions(&v.Sessions) case *structs.IndexedPreparedQueries: filt.filterPreparedQueries(&v.Queries) case **structs.PreparedQuery: filt.redactPreparedQueryTokens(v) default: panic(fmt.Errorf("Unhandled type passed to ACL filter: %#v", subj)) } return nil } // vetRegisterWithACL applies the given ACL's policy to the catalog update and // determines if it is allowed. Since the catalog register request is so // dynamic, this is a pretty complex algorithm and was worth breaking out of the // endpoint. The NodeServices record for the node must be supplied, and can be // nil. // // This is a bit racy because we have to check the state store outside of a // transaction. It's the best we can do because we don't want to flow ACL // checking down there. The node information doesn't change in practice, so this // will be fine. If we expose ways to change node addresses in a later version, // then we should split the catalog API at the node and service level so we can // address this race better (even then it would be super rare, and would at // worst let a service update revert a recent node update, so it doesn't open up // too much abuse). func vetRegisterWithACL(rule acl.ACL, subj *structs.RegisterRequest, ns *structs.NodeServices) error { // Fast path if ACLs are not enabled. if rule == nil { return nil } // This gets called potentially from a few spots so we save it and // return the structure we made if we have it. var memo map[string]interface{} scope := func() map[string]interface{} { if memo != nil { return memo } node := &api.Node{ ID: string(subj.ID), Node: subj.Node, Address: subj.Address, Datacenter: subj.Datacenter, TaggedAddresses: subj.TaggedAddresses, Meta: subj.NodeMeta, } var service *api.AgentService if subj.Service != nil { service = &api.AgentService{ ID: subj.Service.ID, Service: subj.Service.Service, Tags: subj.Service.Tags, Address: subj.Service.Address, Port: subj.Service.Port, EnableTagOverride: subj.Service.EnableTagOverride, } } memo = sentinel.ScopeCatalogUpsert(node, service) return memo } // Vet the node info. This allows service updates to re-post the required // node info for each request without having to have node "write" // privileges. needsNode := ns == nil || subj.ChangesNode(ns.Node) if needsNode && !rule.NodeWrite(subj.Node, scope) { return acl.ErrPermissionDenied } // Vet the service change. This includes making sure they can register // the given service, and that we can write to any existing service that // is being modified by id (if any). if subj.Service != nil { if !rule.ServiceWrite(subj.Service.Service, scope) { return acl.ErrPermissionDenied } if ns != nil { other, ok := ns.Services[subj.Service.ID] // This is effectively a delete, so we DO NOT apply the // sentinel scope to the service we are overwriting, just // the regular ACL policy. if ok && !rule.ServiceWrite(other.Service, nil) { return acl.ErrPermissionDenied } } } // Make sure that the member was flattened before we got there. This // keeps us from having to verify this check as well. if subj.Check != nil { return fmt.Errorf("check member must be nil") } // Vet the checks. Node-level checks require node write, and // service-level checks require service write. for _, check := range subj.Checks { // Make sure that the node matches - we don't allow you to mix // checks from other nodes because we'd have to pull a bunch // more state store data to check this. If ACLs are enabled then // we simply require them to match in a given request. There's a // note in state_store.go to ban this down there in Consul 0.8, // but it's good to leave this here because it's required for // correctness wrt. ACLs. if check.Node != subj.Node { return fmt.Errorf("Node '%s' for check '%s' doesn't match register request node '%s'", check.Node, check.CheckID, subj.Node) } // Node-level check. if check.ServiceID == "" { if !rule.NodeWrite(subj.Node, scope) { return acl.ErrPermissionDenied } continue } // Service-level check, check the common case where it // matches the service part of this request, which has // already been vetted above, and might be being registered // along with its checks. if subj.Service != nil && subj.Service.ID == check.ServiceID { continue } // Service-level check for some other service. Make sure they've // got write permissions for that service. if ns == nil { return fmt.Errorf("Unknown service '%s' for check '%s'", check.ServiceID, check.CheckID) } other, ok := ns.Services[check.ServiceID] if !ok { return fmt.Errorf("Unknown service '%s' for check '%s'", check.ServiceID, check.CheckID) } // We are only adding a check here, so we don't add the scope, // since the sentinel policy doesn't apply to adding checks at // this time. if !rule.ServiceWrite(other.Service, nil) { return acl.ErrPermissionDenied } } return nil } // vetDeregisterWithACL applies the given ACL's policy to the catalog update and // determines if it is allowed. Since the catalog deregister request is so // dynamic, this is a pretty complex algorithm and was worth breaking out of the // endpoint. The NodeService for the referenced service must be supplied, and can // be nil; similar for the HealthCheck for the referenced health check. func vetDeregisterWithACL(rule acl.ACL, subj *structs.DeregisterRequest, ns *structs.NodeService, nc *structs.HealthCheck) error { // Fast path if ACLs are not enabled. if rule == nil { return nil } // We don't apply sentinel in this path, since at this time sentinel // only applies to create and update operations. // This order must match the code in applyRegister() in fsm.go since it // also evaluates things in this order, and will ignore fields based on // this precedence. This lets us also ignore them from an ACL perspective. if subj.ServiceID != "" { if ns == nil { return fmt.Errorf("Unknown service '%s'", subj.ServiceID) } if !rule.ServiceWrite(ns.Service, nil) { return acl.ErrPermissionDenied } } else if subj.CheckID != "" { if nc == nil { return fmt.Errorf("Unknown check '%s'", subj.CheckID) } if nc.ServiceID != "" { if !rule.ServiceWrite(nc.ServiceName, nil) { return acl.ErrPermissionDenied } } else { if !rule.NodeWrite(subj.Node, nil) { return acl.ErrPermissionDenied } } } else { if !rule.NodeWrite(subj.Node, nil) { return acl.ErrPermissionDenied } } return nil }