open-consul/agent/consul/acl.go
Matt Keeler ec9934b6f8 Remaining ACL Unit Tests (#4852)
* Add leader token upgrade test and fix various ACL enablement bugs

* Update the leader ACL initialization tests.

* Add a StateStore ACL tests for ACLTokenSet and ACLTokenGetBy* functions

* Advertise the agents acl support status with the agent/self endpoint.

* Make batch token upsert CAS’able to prevent consistency issues with token auto-upgrade

* Finish up the ACL state store token tests

* Finish the ACL state store unit tests

Also rename some things to make them more consistent.

* Do as much ACL replication testing as I can.
2018-10-31 13:00:46 -07:00

1354 lines
40 KiB
Go

package consul
import (
"fmt"
"log"
"os"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sentinel"
"golang.org/x/time/rate"
)
// These must be kept in sync with the constants in command/agent/acl.go.
const (
// anonymousToken is the token ID we re-write to if there is no token ID
// provided.
anonymousToken = "anonymous"
// redactedToken is shown in structures with embedded tokens when they
// are not allowed to be displayed.
redactedToken = "<hidden>"
// aclUpgradeBatchSize controls how many tokens we look at during each round of upgrading. Individual raft logs
// will be further capped using the aclBatchUpsertSize. This limit just prevents us from creating a single slice
// with all tokens in it.
aclUpgradeBatchSize = 128
// aclUpgradeRateLimit is the number of batch upgrade requests per second.
aclUpgradeRateLimit rate.Limit = 1.0
// aclBatchDeleteSize is the number of deletions to send in a single batch operation. 4096 should produce a batch that is <150KB
// in size but should be sufficiently large to handle 1 replication round in a single batch
aclBatchDeleteSize = 4096
// aclBatchUpsertSize is the target size in bytes we want to submit for a batch upsert request. We estimate the size at runtime
// due to the data being more variable in its size.
aclBatchUpsertSize = 256 * 1024
// DEPRECATED (ACL-Legacy-Compat) aclModeCheck* are all only for legacy usage
// aclModeCheckMinInterval is the minimum amount of time between checking if the
// agent should be using the new or legacy ACL system. All the places it is
// currently used will backoff as it detects that it is remaining in legacy mode.
// However the initial min value is kept small so that new cluster creation
// can enter into new ACL mode quickly.
aclModeCheckMinInterval = 50 * time.Millisecond
// aclModeCheckMaxInterval controls the maximum interval for how often the agent
// checks if it should be using the new or legacy ACL system.
aclModeCheckMaxInterval = 30 * time.Second
)
func minTTL(a time.Duration, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
type ACLRemoteError struct {
Err error
}
func (e ACLRemoteError) Error() string {
return fmt.Sprintf("Error communicating with the ACL Datacenter: %v", e.Err)
}
func IsACLRemoteError(err error) bool {
_, ok := err.(ACLRemoteError)
return ok
}
type ACLResolverDelegate interface {
ACLsEnabled() bool
ACLDatacenter(legacy bool) string
// UseLegacyACLs
UseLegacyACLs() bool
ResolveIdentityFromToken(token string) (bool, structs.ACLIdentity, error)
ResolvePolicyFromID(policyID string) (bool, *structs.ACLPolicy, error)
RPC(method string, args interface{}, reply interface{}) error
}
type remoteACLLegacyResult struct {
authorizer acl.Authorizer
err error
}
type remoteACLIdentityResult struct {
identity structs.ACLIdentity
err error
}
type remoteACLPolicyResult struct {
policy *structs.ACLPolicy
err error
}
// ACLResolverConfig holds all the configuration necessary to create an ACLResolver
type ACLResolverConfig struct {
Config *Config
Logger *log.Logger
// CacheConfig is a pass through configuration for ACL cache limits
CacheConfig *structs.ACLCachesConfig
// Delegate that implements some helper functionality that is server/client specific
Delegate ACLResolverDelegate
// AutoDisable indicates that RPC responses should be checked and if they indicate ACLs are disabled
// remotely then disable them locally as well. This is particularly useful for the client agent
// so that it can detect when the servers have gotten ACLs enabled.
AutoDisable bool
Sentinel sentinel.Evaluator
}
// ACLResolver is the type to handle all your token and policy resolution needs.
//
// Supports:
// - Resolving tokens locally via the ACLResolverDelegate
// - Resolving policies locally via the ACLResolverDelegate
// - Resolving legacy tokens remotely via a ACL.GetPolicy RPC
// - Resolving tokens remotely via an ACL.TokenRead RPC
// - Resolving policies remotely via an ACL.PolicyResolve RPC
//
// Remote Resolution:
// Remote resolution can be done syncrhonously or asynchronously depending
// on the ACLDownPolicy in the Config passed to the resolver.
//
// When the down policy is set to async-cache and we have already cached values
// then go routines will be spawned to perform the RPCs in the background
// and then will udpate the cache with either the positive or negative result.
//
// When the down policy is set to extend-cache or the token/policy is not already
// cached then the same go routines are spawned to do the RPCs in the background.
// However in this mode channels are created to receive the results of the RPC
// and are registered with the resolver. Those channels are immediately read/blocked
// upon.
//
type ACLResolver struct {
config *Config
logger *log.Logger
delegate ACLResolverDelegate
sentinel sentinel.Evaluator
cache *structs.ACLCaches
asyncIdentityResults map[string][]chan (*remoteACLIdentityResult)
asyncIdentityResultsMutex sync.RWMutex
asyncPolicyResults map[string][]chan (*remoteACLPolicyResult)
asyncPolicyResultsMutex sync.RWMutex
asyncLegacyResults map[string][]chan (*remoteACLLegacyResult)
asyncLegacyMutex sync.RWMutex
down acl.Authorizer
autoDisable bool
disabled time.Time
disabledLock sync.RWMutex
}
func NewACLResolver(config *ACLResolverConfig) (*ACLResolver, error) {
if config == nil {
return nil, fmt.Errorf("ACL Resolver must be initialized with a config")
}
if config.Config == nil {
return nil, fmt.Errorf("ACLResolverConfig.Config must not be nil")
}
if config.Delegate == nil {
return nil, fmt.Errorf("ACL Resolver must be initialized with a valid delegate")
}
if config.Logger == nil {
config.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
cache, err := structs.NewACLCaches(config.CacheConfig)
if err != nil {
return nil, err
}
var down acl.Authorizer
switch config.Config.ACLDownPolicy {
case "allow":
down = acl.AllowAll()
case "deny":
down = acl.DenyAll()
case "async-cache", "extend-cache":
// Leave the down policy as nil to signal this.
default:
return nil, fmt.Errorf("invalid ACL down policy %q", config.Config.ACLDownPolicy)
}
return &ACLResolver{
config: config.Config,
logger: config.Logger,
delegate: config.Delegate,
sentinel: config.Sentinel,
cache: cache,
asyncIdentityResults: make(map[string][]chan (*remoteACLIdentityResult)),
asyncPolicyResults: make(map[string][]chan (*remoteACLPolicyResult)),
asyncLegacyResults: make(map[string][]chan (*remoteACLLegacyResult)),
autoDisable: config.AutoDisable,
down: down,
}, nil
}
// fireAsyncLegacyResult is used to notify any watchers that legacy resolution of a token is complete
func (r *ACLResolver) fireAsyncLegacyResult(token string, authorizer acl.Authorizer, ttl time.Duration, err error) {
// cache the result: positive or negative
r.cache.PutAuthorizerWithTTL(token, authorizer, ttl)
// get the list of channels to send the result to
r.asyncLegacyMutex.Lock()
channels := r.asyncLegacyResults[token]
delete(r.asyncLegacyResults, token)
r.asyncLegacyMutex.Unlock()
// notify all watchers of the RPC results
result := &remoteACLLegacyResult{authorizer, err}
for _, cx := range channels {
// only chans that are being blocked on will be in the list of channels so this cannot block
cx <- result
close(cx)
}
}
func (r *ACLResolver) resolveTokenLegacyAsync(token string, cached *structs.AuthorizerCacheEntry) {
req := structs.ACLPolicyResolveLegacyRequest{
Datacenter: r.delegate.ACLDatacenter(true),
ACL: token,
}
cacheTTL := r.config.ACLTokenTTL
if cached != nil {
cacheTTL = cached.TTL
}
var reply structs.ACLPolicyResolveLegacyResponse
err := r.delegate.RPC("ACL.GetPolicy", &req, &reply)
if err == nil {
parent := acl.RootAuthorizer(reply.Parent)
if parent == nil {
r.fireAsyncLegacyResult(token, cached.Authorizer, cacheTTL, acl.ErrInvalidParent)
return
}
var policies []*acl.Policy
policy := reply.Policy
if policy != nil {
policies = append(policies, policy.ConvertFromLegacy())
}
authorizer, err := acl.NewPolicyAuthorizer(parent, policies, r.sentinel)
r.fireAsyncLegacyResult(token, authorizer, reply.TTL, err)
return
}
if acl.IsErrNotFound(err) {
// Make sure to remove from the cache if it was deleted
r.fireAsyncLegacyResult(token, nil, cacheTTL, acl.ErrNotFound)
return
}
// some other RPC error
switch r.config.ACLDownPolicy {
case "allow":
r.fireAsyncLegacyResult(token, acl.AllowAll(), cacheTTL, nil)
return
case "async-cache", "extend-cache":
if cached != nil {
r.fireAsyncLegacyResult(token, cached.Authorizer, cacheTTL, nil)
return
}
fallthrough
default:
r.fireAsyncLegacyResult(token, acl.DenyAll(), cacheTTL, nil)
return
}
}
func (r *ACLResolver) resolveTokenLegacy(token string) (acl.Authorizer, error) {
defer metrics.MeasureSince([]string{"acl", "resolveTokenLegacy"}, time.Now())
// Attempt to resolve locally first (local results are not cached)
// This is only useful for servers where either legacy replication is being
// done or the server is within the primary datacenter.
if done, identity, err := r.delegate.ResolveIdentityFromToken(token); done {
if err == nil && identity != nil {
policies, err := r.resolvePoliciesForIdentity(identity)
if err != nil {
return nil, err
}
return policies.Compile(acl.RootAuthorizer(r.config.ACLDefaultPolicy), r.cache, r.sentinel)
}
return nil, err
}
// Look in the cache prior to making a RPC request
entry := r.cache.GetAuthorizer(token)
if entry != nil && entry.Age() <= minTTL(entry.TTL, r.config.ACLTokenTTL) {
metrics.IncrCounter([]string{"acl", "token", "cache_hit"}, 1)
if entry.Authorizer != nil {
return entry.Authorizer, nil
}
return nil, acl.ErrNotFound
}
metrics.IncrCounter([]string{"acl", "token", "cache_miss"}, 1)
// Resolve the token in the background and wait on the result if we must
var waitChan chan *remoteACLLegacyResult
waitForResult := entry == nil || r.config.ACLDownPolicy != "async-cache"
r.asyncLegacyMutex.Lock()
// check if resolution for this token is already happening
waiters, ok := r.asyncLegacyResults[token]
if !ok || waiters == nil {
// initialize the slice of waiters if not already done
waiters = make([]chan *remoteACLLegacyResult, 0)
}
if waitForResult {
// create the waitChan only if we are going to block waiting
// for the response and then append it to the list of waiters
// Because we will block (not select or discard this chan) we
// do not need to create it as buffered
waitChan = make(chan *remoteACLLegacyResult)
r.asyncLegacyResults[token] = append(waiters, waitChan)
}
r.asyncLegacyMutex.Unlock()
if !ok {
// start the async RPC if it wasn't already ongoing
go r.resolveTokenLegacyAsync(token, entry)
}
if !waitForResult {
// waitForResult being false requires the cacheEntry to not be nil
if entry.Authorizer != nil {
return entry.Authorizer, nil
}
return nil, acl.ErrNotFound
}
// block waiting for the async RPC to finish.
res := <-waitChan
return res.authorizer, res.err
}
// fireAsyncTokenResult is used to notify all waiters that the results of a token resolution is complete
func (r *ACLResolver) fireAsyncTokenResult(token string, identity structs.ACLIdentity, err error) {
// cache the result: positive or negative
r.cache.PutIdentity(token, identity)
// get the list of channels to send the result to
r.asyncIdentityResultsMutex.Lock()
channels := r.asyncIdentityResults[token]
delete(r.asyncIdentityResults, token)
r.asyncIdentityResultsMutex.Unlock()
// notify all watchers of the RPC results
result := &remoteACLIdentityResult{identity, err}
for _, cx := range channels {
// cannot block because all wait chans will have another goroutine blocked on the read
cx <- result
close(cx)
}
}
func (r *ACLResolver) resolveIdentityFromTokenAsync(token string, cached *structs.IdentityCacheEntry) {
req := structs.ACLTokenGetRequest{
Datacenter: r.delegate.ACLDatacenter(false),
TokenID: token,
TokenIDType: structs.ACLTokenSecret,
QueryOptions: structs.QueryOptions{
Token: token,
AllowStale: true,
},
}
var resp structs.ACLTokenResponse
err := r.delegate.RPC("ACL.TokenRead", &req, &resp)
if err == nil {
if resp.Token == nil {
r.fireAsyncTokenResult(token, nil, acl.ErrNotFound)
} else {
r.fireAsyncTokenResult(token, resp.Token, nil)
}
return
}
if acl.IsErrNotFound(err) {
// Make sure to remove from the cache if it was deleted
r.fireAsyncTokenResult(token, nil, acl.ErrNotFound)
return
}
// some other RPC error
if cached != nil && (r.config.ACLDownPolicy == "extend-cache" || r.config.ACLDownPolicy == "async-cache") {
// extend the cache
r.fireAsyncTokenResult(token, cached.Identity, nil)
}
r.fireAsyncTokenResult(token, nil, err)
return
}
func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentity, error) {
// Attempt to resolve locally first (local results are not cached)
if done, identity, err := r.delegate.ResolveIdentityFromToken(token); done {
return identity, err
}
// Check the cache before making any RPC requests
cacheEntry := r.cache.GetIdentity(token)
if cacheEntry != nil && cacheEntry.Age() <= r.config.ACLTokenTTL {
metrics.IncrCounter([]string{"acl", "token", "cache_hit"}, 1)
return cacheEntry.Identity, nil
}
metrics.IncrCounter([]string{"acl", "token", "cache_miss"}, 1)
// Background a RPC request and wait on it if we must
var waitChan chan *remoteACLIdentityResult
waitForResult := cacheEntry == nil || r.config.ACLDownPolicy != "async-cache"
r.asyncIdentityResultsMutex.Lock()
// check if resolution of this token is already ongoing
waiters, ok := r.asyncIdentityResults[token]
if !ok || waiters == nil {
// only initialize the slice of waiters if need be (when this token resolution isn't ongoing)
waiters = make([]chan *remoteACLIdentityResult, 0)
}
if waitForResult {
// create the waitChan only if we are going to block waiting
// for the response and then append it to the list of waiters
// Because we will block (not select or discard this chan) we
// do not need to create it as buffered
waitChan = make(chan *remoteACLIdentityResult)
r.asyncIdentityResults[token] = append(waiters, waitChan)
}
r.asyncIdentityResultsMutex.Unlock()
if !ok {
// only start the RPC if one isn't in flight
go r.resolveIdentityFromTokenAsync(token, cacheEntry)
}
if !waitForResult {
// waitForResult being false requires the cacheEntry to not be nil
return cacheEntry.Identity, nil
}
// block on the read here, this is why we don't need chan buffering
res := <-waitChan
if res.err != nil && !acl.IsErrNotFound(res.err) {
return res.identity, ACLRemoteError{Err: res.err}
}
return res.identity, res.err
}
// fireAsyncPolicyResult is used to notify all waiters that policy resolution is complete.
func (r *ACLResolver) fireAsyncPolicyResult(policyID string, policy *structs.ACLPolicy, err error) {
// cache the result: positive or negative
r.cache.PutPolicy(policyID, policy)
// get the list of channels to send the result to
r.asyncPolicyResultsMutex.Lock()
channels := r.asyncPolicyResults[policyID]
delete(r.asyncPolicyResults, policyID)
r.asyncPolicyResultsMutex.Unlock()
// notify all watchers of the RPC results
result := &remoteACLPolicyResult{policy, err}
for _, cx := range channels {
// not closing the channel as there could be more events to be fired.
cx <- result
}
}
func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdentity, policyIDs []string, cached map[string]*structs.PolicyCacheEntry) {
req := structs.ACLPolicyBatchGetRequest{
Datacenter: r.delegate.ACLDatacenter(false),
PolicyIDs: policyIDs,
QueryOptions: structs.QueryOptions{
Token: identity.SecretToken(),
AllowStale: true,
},
}
found := make(map[string]struct{})
var resp structs.ACLPolicyBatchResponse
err := r.delegate.RPC("ACL.PolicyResolve", &req, &resp)
if err == nil {
for _, policy := range resp.Policies {
r.fireAsyncPolicyResult(policy.ID, policy, nil)
found[policy.ID] = struct{}{}
}
for _, policyID := range policyIDs {
if _, ok := found[policyID]; !ok {
r.fireAsyncPolicyResult(policyID, nil, acl.ErrNotFound)
}
}
return
}
if acl.IsErrNotFound(err) {
for _, policyID := range policyIDs {
// Make sure to remove from the cache if it was deleted
r.fireAsyncTokenResult(policyID, nil, acl.ErrNotFound)
}
return
}
// other RPC error - use cache if available
extendCache := r.config.ACLDownPolicy == "extend-cache" || r.config.ACLDownPolicy == "async-cache"
for _, policyID := range policyIDs {
if entry, ok := cached[policyID]; extendCache && ok {
r.fireAsyncPolicyResult(policyID, entry.Policy, nil)
} else {
r.fireAsyncPolicyResult(policyID, nil, ACLRemoteError{Err: err})
}
}
return
}
func (r *ACLResolver) filterPoliciesByScope(policies structs.ACLPolicies) structs.ACLPolicies {
var out structs.ACLPolicies
for _, policy := range policies {
if len(policy.Datacenters) == 0 {
out = append(out, policy)
continue
}
for _, dc := range policy.Datacenters {
if dc == r.config.Datacenter {
out = append(out, policy)
continue
}
}
}
return out
}
func (r *ACLResolver) resolvePoliciesForIdentity(identity structs.ACLIdentity) (structs.ACLPolicies, error) {
policyIDs := identity.PolicyIDs()
if len(policyIDs) == 0 {
policy := identity.EmbeddedPolicy()
if policy != nil {
return []*structs.ACLPolicy{policy}, nil
}
// In this case the default policy will be all that is in effect.
return nil, nil
}
// For the new ACLs policy replication is mandatory for correct operation on servers. Therefore
// we only attempt to resolve policies locally
policies := make([]*structs.ACLPolicy, 0, len(policyIDs))
// Get all associated policies
var missing []string
var expired []*structs.ACLPolicy
expCacheMap := make(map[string]*structs.PolicyCacheEntry)
for _, policyID := range policyIDs {
if done, policy, err := r.delegate.ResolvePolicyFromID(policyID); done {
if err != nil && !acl.IsErrNotFound(err) {
return nil, err
}
if policy != nil {
policies = append(policies, policy)
} else {
r.logger.Printf("[WARN] acl: policy %q not found for identity %q", policyID, identity.ID())
}
continue
}
// create the missing list which we can execute an RPC to get all the missing policies at once
entry := r.cache.GetPolicy(policyID)
if entry == nil {
missing = append(missing, policyID)
continue
}
if entry.Policy == nil {
// this happens when we cache a negative response for the policies existence
continue
}
if entry.Age() >= r.config.ACLPolicyTTL {
expired = append(expired, entry.Policy)
expCacheMap[policyID] = entry
} else {
policies = append(policies, entry.Policy)
}
}
// Hot-path if we have no missing or expired policies
if len(missing)+len(expired) == 0 {
return r.filterPoliciesByScope(policies), nil
}
fetchIDs := missing
for _, policy := range expired {
fetchIDs = append(fetchIDs, policy.ID)
}
// Background a RPC request and wait on it if we must
var waitChan chan *remoteACLPolicyResult
waitForResult := len(missing) > 0 || r.config.ACLDownPolicy != "async-cache"
if waitForResult {
// buffered because there are going to be multiple go routines that send data to this chan
waitChan = make(chan *remoteACLPolicyResult, len(fetchIDs))
}
var newAsyncFetchIds []string
r.asyncPolicyResultsMutex.Lock()
for _, policyID := range fetchIDs {
clients, ok := r.asyncPolicyResults[policyID]
if !ok || clients == nil {
clients = make([]chan *remoteACLPolicyResult, 0)
}
if waitForResult {
r.asyncPolicyResults[policyID] = append(clients, waitChan)
}
if !ok {
newAsyncFetchIds = append(newAsyncFetchIds, policyID)
}
}
r.asyncPolicyResultsMutex.Unlock()
if len(newAsyncFetchIds) > 0 {
// only start the RPC if one isn't in flight
go r.resolvePoliciesAsyncForIdentity(identity, newAsyncFetchIds, expCacheMap)
}
if !waitForResult {
// waitForResult being false requires that all the policies were cached already
policies = append(policies, expired...)
return r.filterPoliciesByScope(policies), nil
}
for i := 0; i < len(newAsyncFetchIds); i++ {
res := <-waitChan
if res.err != nil {
return nil, res.err
}
if res.policy != nil {
policies = append(policies, res.policy)
}
}
return r.filterPoliciesByScope(policies), nil
}
func (r *ACLResolver) resolveTokenToPolicies(token string) (structs.ACLPolicies, error) {
// Resolve the token to an ACLIdentity
identity, err := r.resolveIdentityFromToken(token)
if err != nil {
return nil, err
} else if identity == nil {
return nil, acl.ErrNotFound
}
// Resolve the ACLIdentity to ACLPolicies
return r.resolvePoliciesForIdentity(identity)
}
func (r *ACLResolver) disableACLsWhenUpstreamDisabled(err error) error {
if !r.autoDisable || err == nil || !acl.IsErrDisabled(err) {
return err
}
r.logger.Printf("[DEBUG] acl: ACLs disabled on upstream servers, will check again after %s", r.config.ACLDisabledTTL)
r.disabledLock.Lock()
r.disabled = time.Now().Add(r.config.ACLDisabledTTL)
r.disabledLock.Unlock()
return err
}
func (r *ACLResolver) ResolveToken(token string) (acl.Authorizer, error) {
if !r.ACLsEnabled() {
return nil, nil
}
if acl.RootAuthorizer(token) != nil {
return nil, acl.ErrRootDenied
}
// handle the anonymous token
if token == "" {
token = anonymousToken
}
if r.delegate.UseLegacyACLs() {
authorizer, err := r.resolveTokenLegacy(token)
return authorizer, r.disableACLsWhenUpstreamDisabled(err)
}
defer metrics.MeasureSince([]string{"acl", "ResolveToken"}, time.Now())
policies, err := r.resolveTokenToPolicies(token)
if err != nil {
r.disableACLsWhenUpstreamDisabled(err)
if IsACLRemoteError(err) {
r.logger.Printf("[ERR] consul.acl: %v", err)
return r.down, nil
}
return nil, err
}
// Build the Authorizer
authorizer, err := policies.Compile(acl.RootAuthorizer(r.config.ACLDefaultPolicy), r.cache, r.sentinel)
return authorizer, err
}
func (r *ACLResolver) ACLsEnabled() bool {
// Whether we desire ACLs to be enabled according to configuration
if !r.delegate.ACLsEnabled() {
return false
}
if r.autoDisable {
// Whether ACLs are disabled according to RPCs failing with a ACLs Disabled error
r.disabledLock.RLock()
defer r.disabledLock.RUnlock()
return !time.Now().Before(r.disabled)
}
return true
}
func (r *ACLResolver) GetMergedPolicyForToken(token string) (*acl.Policy, error) {
policies, err := r.resolveTokenToPolicies(token)
if err != nil {
return nil, err
}
if len(policies) == 0 {
return nil, acl.ErrNotFound
}
return policies.Merge(r.cache, r.sentinel)
}
// aclFilter is used to filter results from our state store based on ACL rules
// configured for the provided token.
type aclFilter struct {
authorizer acl.Authorizer
logger *log.Logger
enforceVersion8 bool
}
// newACLFilter constructs a new aclFilter.
func newACLFilter(authorizer acl.Authorizer, logger *log.Logger, enforceVersion8 bool) *aclFilter {
if logger == nil {
logger = log.New(os.Stderr, "", log.LstdFlags)
}
return &aclFilter{
authorizer: authorizer,
logger: logger,
enforceVersion8: enforceVersion8,
}
}
// allowNode is used to determine if a node is accessible for an ACL.
func (f *aclFilter) allowNode(node string) bool {
if !f.enforceVersion8 {
return true
}
return f.authorizer.NodeRead(node)
}
// allowService is used to determine if a service is accessible for an ACL.
func (f *aclFilter) allowService(service string) bool {
if service == "" {
return true
}
if !f.enforceVersion8 && service == structs.ConsulServiceID {
return true
}
return f.authorizer.ServiceRead(service)
}
// allowSession is used to determine if a session for a node is accessible for
// an ACL.
func (f *aclFilter) allowSession(node string) bool {
if !f.enforceVersion8 {
return true
}
return f.authorizer.SessionRead(node)
}
// filterHealthChecks is used to filter a set of health checks down based on
// the configured ACL rules for a token.
func (f *aclFilter) filterHealthChecks(checks *structs.HealthChecks) {
hc := *checks
for i := 0; i < len(hc); i++ {
check := hc[i]
if f.allowNode(check.Node) && f.allowService(check.ServiceName) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping check %q from result due to ACLs", check.CheckID)
hc = append(hc[:i], hc[i+1:]...)
i--
}
*checks = hc
}
// filterServices is used to filter a set of services based on ACLs.
func (f *aclFilter) filterServices(services structs.Services) {
for svc := range services {
if f.allowService(svc) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping service %q from result due to ACLs", svc)
delete(services, svc)
}
}
// filterServiceNodes is used to filter a set of nodes for a given service
// based on the configured ACL rules.
func (f *aclFilter) filterServiceNodes(nodes *structs.ServiceNodes) {
sn := *nodes
for i := 0; i < len(sn); i++ {
node := sn[i]
if f.allowNode(node.Node) && f.allowService(node.ServiceName) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node.Node)
sn = append(sn[:i], sn[i+1:]...)
i--
}
*nodes = sn
}
// filterNodeServices is used to filter services on a given node base on ACLs.
func (f *aclFilter) filterNodeServices(services **structs.NodeServices) {
if *services == nil {
return
}
if !f.allowNode((*services).Node.Node) {
*services = nil
return
}
for svc := range (*services).Services {
if f.allowService(svc) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping service %q from result due to ACLs", svc)
delete((*services).Services, svc)
}
}
// filterCheckServiceNodes is used to filter nodes based on ACL rules.
func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) {
csn := *nodes
for i := 0; i < len(csn); i++ {
node := csn[i]
if f.allowNode(node.Node.Node) && f.allowService(node.Service.Service) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node.Node.Node)
csn = append(csn[:i], csn[i+1:]...)
i--
}
*nodes = csn
}
// filterSessions is used to filter a set of sessions based on ACLs.
func (f *aclFilter) filterSessions(sessions *structs.Sessions) {
s := *sessions
for i := 0; i < len(s); i++ {
session := s[i]
if f.allowSession(session.Node) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping session %q from result due to ACLs", session.ID)
s = append(s[:i], s[i+1:]...)
i--
}
*sessions = s
}
// filterCoordinates is used to filter nodes in a coordinate dump based on ACL
// rules.
func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) {
c := *coords
for i := 0; i < len(c); i++ {
node := c[i].Node
if f.allowNode(node) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node)
c = append(c[:i], c[i+1:]...)
i--
}
*coords = c
}
// filterIntentions is used to filter intentions based on ACL rules.
// We prune entries the user doesn't have access to, and we redact any tokens
// if the user doesn't have a management token.
func (f *aclFilter) filterIntentions(ixns *structs.Intentions) {
// Management tokens can see everything with no filtering.
if f.authorizer.ACLRead() {
return
}
// Otherwise, we need to see what the token has access to.
ret := make(structs.Intentions, 0, len(*ixns))
for _, ixn := range *ixns {
// If no prefix ACL applies to this then filter it, since
// we know at this point the user doesn't have a management
// token, otherwise see what the policy says.
prefix, ok := ixn.GetACLPrefix()
if !ok || !f.authorizer.IntentionRead(prefix) {
f.logger.Printf("[DEBUG] consul: dropping intention %q from result due to ACLs", ixn.ID)
continue
}
ret = append(ret, ixn)
}
*ixns = ret
}
// filterNodeDump is used to filter through all parts of a node dump and
// remove elements the provided ACL token cannot access.
func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) {
nd := *dump
for i := 0; i < len(nd); i++ {
info := nd[i]
// Filter nodes
if node := info.Node; !f.allowNode(node) {
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node)
nd = append(nd[:i], nd[i+1:]...)
i--
continue
}
// Filter services
for j := 0; j < len(info.Services); j++ {
svc := info.Services[j].Service
if f.allowService(svc) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping service %q from result due to ACLs", svc)
info.Services = append(info.Services[:j], info.Services[j+1:]...)
j--
}
// Filter checks
for j := 0; j < len(info.Checks); j++ {
chk := info.Checks[j]
if f.allowService(chk.ServiceName) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping check %q from result due to ACLs", chk.CheckID)
info.Checks = append(info.Checks[:j], info.Checks[j+1:]...)
j--
}
}
*dump = nd
}
// filterNodes is used to filter through all parts of a node list and remove
// elements the provided ACL token cannot access.
func (f *aclFilter) filterNodes(nodes *structs.Nodes) {
n := *nodes
for i := 0; i < len(n); i++ {
node := n[i].Node
if f.allowNode(node) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node)
n = append(n[:i], n[i+1:]...)
i--
}
*nodes = n
}
// redactPreparedQueryTokens will redact any tokens unless the client has a
// management token. This eases the transition to delegated authority over
// prepared queries, since it was easy to capture management tokens in Consul
// 0.6.3 and earlier, and we don't want to willy-nilly show those. This does
// have the limitation of preventing delegated non-management users from seeing
// captured tokens, but they can at least see whether or not a token is set.
func (f *aclFilter) redactPreparedQueryTokens(query **structs.PreparedQuery) {
// Management tokens can see everything with no filtering.
if f.authorizer.ACLWrite() {
return
}
// Let the user see if there's a blank token, otherwise we need
// to redact it, since we know they don't have a management
// token.
if (*query).Token != "" {
// Redact the token, using a copy of the query structure
// since we could be pointed at a live instance from the
// state store so it's not safe to modify it. Note that
// this clone will still point to things like underlying
// arrays in the original, but for modifying just the
// token it will be safe to use.
clone := *(*query)
clone.Token = redactedToken
*query = &clone
}
}
// filterPreparedQueries is used to filter prepared queries based on ACL rules.
// We prune entries the user doesn't have access to, and we redact any tokens
// if the user doesn't have a management token.
func (f *aclFilter) filterPreparedQueries(queries *structs.PreparedQueries) {
// Management tokens can see everything with no filtering.
if f.authorizer.ACLWrite() {
return
}
// Otherwise, we need to see what the token has access to.
ret := make(structs.PreparedQueries, 0, len(*queries))
for _, query := range *queries {
// If no prefix ACL applies to this query then filter it, since
// we know at this point the user doesn't have a management
// token, otherwise see what the policy says.
prefix, ok := query.GetACLPrefix()
if !ok || !f.authorizer.PreparedQueryRead(prefix) {
f.logger.Printf("[DEBUG] consul: dropping prepared query %q from result due to ACLs", query.ID)
continue
}
// Redact any tokens if necessary. We make a copy of just the
// pointer so we don't mess with the caller's slice.
final := query
f.redactPreparedQueryTokens(&final)
ret = append(ret, final)
}
*queries = ret
}
func (f *aclFilter) redactTokenSecret(token **structs.ACLToken) {
if token == nil || *token == nil || f == nil || f.authorizer.ACLWrite() {
return
}
clone := *(*token)
clone.SecretID = redactedToken
*token = &clone
}
func (f *aclFilter) redactTokenSecrets(tokens *structs.ACLTokens) {
ret := make(structs.ACLTokens, 0, len(*tokens))
for _, token := range *tokens {
final := token
f.redactTokenSecret(&final)
ret = append(ret, final)
}
*tokens = ret
}
func (r *ACLResolver) filterACLWithAuthorizer(authorizer acl.Authorizer, subj interface{}) error {
if authorizer == nil {
return nil
}
// Create the filter
filt := newACLFilter(authorizer, r.logger, r.config.ACLEnforceVersion8)
switch v := subj.(type) {
case *structs.CheckServiceNodes:
filt.filterCheckServiceNodes(v)
case *structs.IndexedCheckServiceNodes:
filt.filterCheckServiceNodes(&v.Nodes)
case *structs.IndexedCoordinates:
filt.filterCoordinates(&v.Coordinates)
case *structs.IndexedHealthChecks:
filt.filterHealthChecks(&v.HealthChecks)
case *structs.IndexedIntentions:
filt.filterIntentions(&v.Intentions)
case *structs.IndexedNodeDump:
filt.filterNodeDump(&v.Dump)
case *structs.IndexedNodes:
filt.filterNodes(&v.Nodes)
case *structs.IndexedNodeServices:
filt.filterNodeServices(&v.NodeServices)
case *structs.IndexedServiceNodes:
filt.filterServiceNodes(&v.ServiceNodes)
case *structs.IndexedServices:
filt.filterServices(v.Services)
case *structs.IndexedSessions:
filt.filterSessions(&v.Sessions)
case *structs.IndexedPreparedQueries:
filt.filterPreparedQueries(&v.Queries)
case **structs.PreparedQuery:
filt.redactPreparedQueryTokens(v)
case *structs.ACLTokens:
filt.redactTokenSecrets(v)
case **structs.ACLToken:
filt.redactTokenSecret(v)
default:
panic(fmt.Errorf("Unhandled type passed to ACL filter: %#v", subj))
}
return nil
}
// filterACL is used to filter results from our service catalog based on the
// rules configured for the provided token.
func (r *ACLResolver) filterACL(token string, subj interface{}) error {
// Get the ACL from the token
authorizer, err := r.ResolveToken(token)
if err != nil {
return err
}
// Fast path if ACLs are not enabled
if authorizer == nil {
return nil
}
return r.filterACLWithAuthorizer(authorizer, subj)
}
// vetRegisterWithACL applies the given ACL's policy to the catalog update and
// determines if it is allowed. Since the catalog register request is so
// dynamic, this is a pretty complex algorithm and was worth breaking out of the
// endpoint. The NodeServices record for the node must be supplied, and can be
// nil.
//
// This is a bit racy because we have to check the state store outside of a
// transaction. It's the best we can do because we don't want to flow ACL
// checking down there. The node information doesn't change in practice, so this
// will be fine. If we expose ways to change node addresses in a later version,
// then we should split the catalog API at the node and service level so we can
// address this race better (even then it would be super rare, and would at
// worst let a service update revert a recent node update, so it doesn't open up
// too much abuse).
func vetRegisterWithACL(rule acl.Authorizer, subj *structs.RegisterRequest,
ns *structs.NodeServices) error {
// Fast path if ACLs are not enabled.
if rule == nil {
return nil
}
// This gets called potentially from a few spots so we save it and
// return the structure we made if we have it.
var memo map[string]interface{}
scope := func() map[string]interface{} {
if memo != nil {
return memo
}
node := &api.Node{
ID: string(subj.ID),
Node: subj.Node,
Address: subj.Address,
Datacenter: subj.Datacenter,
TaggedAddresses: subj.TaggedAddresses,
Meta: subj.NodeMeta,
}
var service *api.AgentService
if subj.Service != nil {
service = &api.AgentService{
ID: subj.Service.ID,
Service: subj.Service.Service,
Tags: subj.Service.Tags,
Meta: subj.Service.Meta,
Address: subj.Service.Address,
Port: subj.Service.Port,
EnableTagOverride: subj.Service.EnableTagOverride,
}
}
memo = sentinel.ScopeCatalogUpsert(node, service)
return memo
}
// Vet the node info. This allows service updates to re-post the required
// node info for each request without having to have node "write"
// privileges.
needsNode := ns == nil || subj.ChangesNode(ns.Node)
if needsNode && !rule.NodeWrite(subj.Node, scope) {
return acl.ErrPermissionDenied
}
// Vet the service change. This includes making sure they can register
// the given service, and that we can write to any existing service that
// is being modified by id (if any).
if subj.Service != nil {
if !rule.ServiceWrite(subj.Service.Service, scope) {
return acl.ErrPermissionDenied
}
if ns != nil {
other, ok := ns.Services[subj.Service.ID]
// This is effectively a delete, so we DO NOT apply the
// sentinel scope to the service we are overwriting, just
// the regular ACL policy.
if ok && !rule.ServiceWrite(other.Service, nil) {
return acl.ErrPermissionDenied
}
}
}
// Make sure that the member was flattened before we got there. This
// keeps us from having to verify this check as well.
if subj.Check != nil {
return fmt.Errorf("check member must be nil")
}
// Vet the checks. Node-level checks require node write, and
// service-level checks require service write.
for _, check := range subj.Checks {
// Make sure that the node matches - we don't allow you to mix
// checks from other nodes because we'd have to pull a bunch
// more state store data to check this. If ACLs are enabled then
// we simply require them to match in a given request. There's a
// note in state_store.go to ban this down there in Consul 0.8,
// but it's good to leave this here because it's required for
// correctness wrt. ACLs.
if check.Node != subj.Node {
return fmt.Errorf("Node '%s' for check '%s' doesn't match register request node '%s'",
check.Node, check.CheckID, subj.Node)
}
// Node-level check.
if check.ServiceID == "" {
if !rule.NodeWrite(subj.Node, scope) {
return acl.ErrPermissionDenied
}
continue
}
// Service-level check, check the common case where it
// matches the service part of this request, which has
// already been vetted above, and might be being registered
// along with its checks.
if subj.Service != nil && subj.Service.ID == check.ServiceID {
continue
}
// Service-level check for some other service. Make sure they've
// got write permissions for that service.
if ns == nil {
return fmt.Errorf("Unknown service '%s' for check '%s'", check.ServiceID, check.CheckID)
}
other, ok := ns.Services[check.ServiceID]
if !ok {
return fmt.Errorf("Unknown service '%s' for check '%s'", check.ServiceID, check.CheckID)
}
// We are only adding a check here, so we don't add the scope,
// since the sentinel policy doesn't apply to adding checks at
// this time.
if !rule.ServiceWrite(other.Service, nil) {
return acl.ErrPermissionDenied
}
}
return nil
}
// vetDeregisterWithACL applies the given ACL's policy to the catalog update and
// determines if it is allowed. Since the catalog deregister request is so
// dynamic, this is a pretty complex algorithm and was worth breaking out of the
// endpoint. The NodeService for the referenced service must be supplied, and can
// be nil; similar for the HealthCheck for the referenced health check.
func vetDeregisterWithACL(rule acl.Authorizer, subj *structs.DeregisterRequest,
ns *structs.NodeService, nc *structs.HealthCheck) error {
// Fast path if ACLs are not enabled.
if rule == nil {
return nil
}
// We don't apply sentinel in this path, since at this time sentinel
// only applies to create and update operations.
// This order must match the code in applyRegister() in fsm.go since it
// also evaluates things in this order, and will ignore fields based on
// this precedence. This lets us also ignore them from an ACL perspective.
if subj.ServiceID != "" {
if ns == nil {
return fmt.Errorf("Unknown service '%s'", subj.ServiceID)
}
if !rule.ServiceWrite(ns.Service, nil) {
return acl.ErrPermissionDenied
}
} else if subj.CheckID != "" {
if nc == nil {
return fmt.Errorf("Unknown check '%s'", subj.CheckID)
}
if nc.ServiceID != "" {
if !rule.ServiceWrite(nc.ServiceName, nil) {
return acl.ErrPermissionDenied
}
} else {
if !rule.NodeWrite(subj.Node, nil) {
return acl.ErrPermissionDenied
}
}
} else {
if !rule.NodeWrite(subj.Node, nil) {
return acl.ErrPermissionDenied
}
}
return nil
}