Activates fallback to replicated ACLs.

This commit is contained in:
James Phillips 2016-08-03 17:01:32 -07:00
parent c4ae68010b
commit 796933b45b
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
5 changed files with 107 additions and 56 deletions

View File

@ -8,7 +8,7 @@ import (
)
// FaultFunc is a function used to fault in the parent,
// rules for an ACL given it's ID
// rules for an ACL given its ID
type FaultFunc func(id string) (string, string, error)
// aclEntry allows us to store the ACL with it's policy ID
@ -46,7 +46,7 @@ func NewCache(size int, faultfn FaultFunc) (*Cache, error) {
// GetPolicy is used to get a potentially cached policy set.
// If not cached, it will be parsed, and then cached.
func (c *Cache) GetPolicy(rules string) (*Policy, error) {
return c.getPolicy(c.ruleID(rules), rules)
return c.getPolicy(RuleID(rules), rules)
}
// getPolicy is an internal method to get a cached policy,
@ -66,8 +66,8 @@ func (c *Cache) getPolicy(id, rules string) (*Policy, error) {
}
// ruleID is used to generate an ID for a rule
func (c *Cache) ruleID(rules string) string {
// RuleID is used to generate an ID for a rule
func RuleID(rules string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(rules)))
}
@ -112,7 +112,7 @@ func (c *Cache) GetACL(id string) (ACL, error) {
if err != nil {
return nil, err
}
ruleID := c.ruleID(rules)
ruleID := RuleID(rules)
// Check for a compiled ACL
policyID := c.policyID(parentID, ruleID)

View File

@ -44,7 +44,7 @@ var (
permissionDeniedErr = errors.New(permissionDenied)
)
// aclCacheEntry is used to cache non-authoritative ACL's
// aclCacheEntry is used to cache non-authoritative ACLs
// If non-authoritative, then we must respect a TTL
type aclCacheEntry struct {
ACL acl.ACL
@ -52,9 +52,14 @@ type aclCacheEntry struct {
ETag string
}
// aclFault is used to fault in the rules for an ACL if we take a miss
func (s *Server) aclFault(id string) (string, string, error) {
// aclLocalFault is used by the authoritative ACL cache to fault in the rules
// for an ACL if we take a miss. This goes directly to the state store, so it
// assumes its running in the ACL datacenter, or in a non-ACL datacenter when
// using its replicated ACLs during an outage.
func (s *Server) aclLocalFault(id string) (string, string, error) {
defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now())
// Query the state store.
state := s.fsm.State()
_, acl, err := state.ACLGet(id)
if err != nil {
@ -64,19 +69,23 @@ func (s *Server) aclFault(id string) (string, string, error) {
return "", "", errors.New(aclNotFound)
}
// Management tokens have no policy and inherit from the
// 'manage' root policy
// Management tokens have no policy and inherit from the 'manage' root
// policy.
if acl.Type == structs.ACLTypeManagement {
return "manage", "", nil
}
// Otherwise use the base policy
// Otherwise use the default policy.
return s.config.ACLDefaultPolicy, acl.Rules, nil
}
// resolveToken is used to resolve an ACL if any is appropriate
// resolveToken is the primary interface used by ACL-checkers (such as an
// endpoint handling a request) to resolve a token. If ACLs aren't enabled
// then this will return a nil token, otherwise it will attempt to use local
// cache and ultimately the ACL datacenter to get the policy associated with the
// token.
func (s *Server) resolveToken(id string) (acl.ACL, error) {
// Check if there is no ACL datacenter (ACL's disabled)
// Check if there is no ACL datacenter (ACLs disabled)
authDC := s.config.ACLDatacenter
if len(authDC) == 0 {
return nil, nil
@ -108,23 +117,30 @@ type aclCache struct {
config *Config
logger *log.Logger
// acls is a non-authoritative ACL cache
// acls is a non-authoritative ACL cache.
acls *lru.Cache
// aclPolicyCache is a policy cache
// aclPolicyCache is a non-authoritative policy cache.
policies *lru.Cache
// The RPC function used to talk to the client/server
// rpc is a function used to talk to the client/server.
rpc rpcFn
// local is a function used to look for an ACL locally if replication is
// enabled. This will be nil if replication isn't enabled.
local acl.FaultFunc
}
// newAclCache returns a new cache layer for ACLs and policies
func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) {
// newAclCache returns a new non-authoritative cache for ACLs. This is used for
// performance, and is used inside the ACL datacenter on non-leader servers, and
// outside the ACL datacenter everywhere.
func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFunc) (*aclCache, error) {
var err error
cache := &aclCache{
config: conf,
logger: logger,
rpc: rpc,
local: local,
}
// Initialize the non-authoritative ACL cache
@ -142,17 +158,16 @@ func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error)
return cache, nil
}
// lookupACL is used when we are non-authoritative, and need
// to resolve an ACL
// lookupACL is used when we are non-authoritative, and need to resolve an ACL.
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Check the cache for the ACL
// Check the cache for the ACL.
var cached *aclCacheEntry
raw, ok := c.acls.Get(id)
if ok {
cached = raw.(*aclCacheEntry)
}
// Check for live cache
// Check for live cache.
if cached != nil && time.Now().Before(cached.Expires) {
metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1)
return cached.ACL, nil
@ -160,7 +175,7 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1)
}
// Attempt to refresh the policy
// Attempt to refresh the policy from the ACL datacenter via an RPC.
args := structs.ACLPolicyRequest{
Datacenter: authDC,
ACL: id,
@ -168,29 +183,62 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
if cached != nil {
args.ETag = cached.ETag
}
var out structs.ACLPolicy
err := c.rpc("ACL.GetPolicy", &args, &out)
// Handle the happy path
var reply structs.ACLPolicy
err := c.rpc("ACL.GetPolicy", &args, &reply)
if err == nil {
return c.useACLPolicy(id, authDC, cached, &out)
return c.useACLPolicy(id, authDC, cached, &reply)
}
// Check for not-found
// Check for not-found, which will cause us to bail immediately. For any
// other error we report it in the logs but can continue.
if strings.Contains(err.Error(), aclNotFound) {
return nil, errors.New(aclNotFound)
} else {
s := id
// Print last 3 chars of the token if long enough, otherwise completly hide it
if len(s) > 3 {
s = fmt.Sprintf("token ending in '%s'", s[len(s)-3:])
} else {
s = redactedToken
}
c.logger.Printf("[ERR] consul.acl: Failed to get policy for %s: %v", s, err)
// TODO (slackpad) - We used to print a few characters of the
// token here if the token was long enough. This was bugging me
// so I deleted it. We should probably print a hash of the token,
// or better yet let's add another ID to tokens to identify them
// without giving away their privileges.
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
}
// Unable to refresh, apply the down policy
// At this point we might have an expired cache entry and we know that
// there was a problem getting the ACL from the ACL datacenter. If a
// local ACL fault function is registered to query replicated ACL data,
// and the user's policy allows it, we will try locally before we give
// up.
if c.local != nil && c.config.ACLDownPolicy == "extend-cache" {
parent, rules, err := c.local(id)
if err != nil {
// We don't make an exception here for ACLs that aren't
// found locally. It seems more robust to use an expired
// cached entry (if we have one) rather than ignore it
// for the case that replication was a bit behind and
// didn't have the ACL yet.
c.logger.Printf("[DEBUG] consul.acl: Failed to get policy from replicated ACLs: %v", err)
goto ACL_DOWN
}
policy, err := acl.Parse(rules)
if err != nil {
c.logger.Printf("[DEBUG] consul.acl: Failed to parse policy for replicated ACL: %v", err)
goto ACL_DOWN
}
policy.ID = acl.RuleID(rules)
// Fake up an ACL datacenter reply and inject it into the cache.
// Note we use the local TTL here, so this'll be used for that
// amount of time even once the ACL datacenter becomes available.
metrics.IncrCounter([]string{"consul", "acl", "replication_hit"}, 1)
reply.ETag = makeACLETag(parent, policy)
reply.TTL = c.config.ACLTTL
reply.Parent = parent
reply.Policy = policy
return c.useACLPolicy(id, authDC, cached, &reply)
}
ACL_DOWN:
// Unable to refresh, apply the down policy.
switch c.config.ACLDownPolicy {
case "allow":
return acl.AllowAll(), nil

View File

@ -161,6 +161,11 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
})
}
// makeACLETag returns an ETag for the given parent and policy.
func makeACLETag(parent string, policy *acl.Policy) string {
return fmt.Sprintf("%s:%s", parent, policy.ID)
}
// GetPolicy is used to retrieve a compiled policy object with a TTL. Does not
// support a blocking query.
func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error {
@ -181,7 +186,7 @@ func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy
// Generate an ETag
conf := a.srv.config
etag := fmt.Sprintf("%s:%s", parent, policy.ID)
etag := makeACLETag(parent, policy)
// Setup the response
reply.ETag = etag

View File

@ -179,6 +179,7 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
time.Sleep(1*time.Second - elapsed)
ops, start = 0, time.Now()
}
ops++
// Note that we are using the single ACL interface here and not
// performing all this inside a single transaction. This is OK
@ -192,7 +193,6 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
if err := aclApplyInternal(s, change, &reply); err != nil {
return err
}
ops++
}
return nil
}
@ -232,6 +232,8 @@ func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) {
lastRemoteIndex = 0
}
// Calculate the changes required to bring the state into sync and then
// apply them.
changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex)
if err := s.updateLocalACLs(changes); err != nil {
return 0, fmt.Errorf("failed to sync ACL changes: %v", err)
@ -252,8 +254,6 @@ func (s *Server) IsACLReplicationEnabled() bool {
// runACLReplication is a long-running goroutine that will attempt to replicate
// ACLs while the server is the leader, until the shutdown channel closes.
func (s *Server) runACLReplication() {
defer s.shutdownWait.Done()
// Give each server's replicator a random initial phase for good
// measure.
select {

View File

@ -67,7 +67,7 @@ const (
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
// aclAuthCache is the authoritative ACL cache
// aclAuthCache is the authoritative ACL cache.
aclAuthCache *acl.Cache
// aclCache is the non-authoritative ACL cache.
@ -151,15 +151,10 @@ type Server struct {
// shutdown and the associated members here are used in orchestrating
// a clean shutdown. The shutdownCh is never written to, only closed to
// indicate a shutdown has been initiated. The shutdownWait group will
// be waited on after closing the shutdownCh, but before any other
// shutdown activities take place in the server. Anything added to the
// shutdownWait group will block all the rest of shutdown, so use this
// sparingly and carefully.
// indicate a shutdown has been initiated.
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
shutdownWait sync.WaitGroup
}
// Holds the RPC endpoints
@ -236,16 +231,21 @@ func NewServer(config *Config) (*Server, error) {
}
// Initialize the authoritative ACL cache.
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault)
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
return nil, fmt.Errorf("Failed to create authoritative ACL cache: %v", err)
}
// Set up the non-authoritative ACL cache.
if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil {
// Set up the non-authoritative ACL cache. A nil local function is given
// if ACL replication isn't enabled.
var local acl.FaultFunc
if s.IsACLReplicationEnabled() {
local = s.aclLocalFault
}
if s.aclCache, err = newAclCache(config, logger, s.RPC, local); err != nil {
s.Shutdown()
return nil, err
return nil, fmt.Errorf("Failed to create non-authoritative ACL cache: %v", err)
}
// Initialize the RPC layer.
@ -280,7 +280,6 @@ func NewServer(config *Config) (*Server, error) {
// Start ACL replication.
if s.IsACLReplicationEnabled() {
s.shutdownWait.Add(1)
go s.runACLReplication()
}
@ -509,7 +508,6 @@ func (s *Server) Shutdown() error {
s.shutdown = true
close(s.shutdownCh)
s.shutdownWait.Wait()
if s.serfLAN != nil {
s.serfLAN.Shutdown()