Only send one single ACL cache refresh across network when TTL is over

It will allow the following:

 * when connectivity is limited (saturated linnks between DCs), only one
   single request to refresh an ACL will be sent to ACL master DC instead
   of statcking ACL refresh queries
 * when extend-cache is used for ACL, do not wait for result, but refresh
   the ACL asynchronously, so no delay is not impacting slave DC
 * When extend-cache is not used, keep the existing blocking mechanism,
   but only send a single refresh request.

This will fix https://github.com/hashicorp/consul/issues/3524
This commit is contained in:
Pierre Souchay 2018-06-28 09:06:14 +02:00
parent 85baa70294
commit da9c91fd3d
1 changed files with 72 additions and 7 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"sync"
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
@ -116,6 +117,9 @@ type aclCache struct {
// local is a function used to look for an ACL locally if replication is // local is a function used to look for an ACL locally if replication is
// enabled. This will be nil if replication isn't enabled. // enabled. This will be nil if replication isn't enabled.
local acl.FaultFunc local acl.FaultFunc
fetchMutex sync.RWMutex
fetchMap map[string][]chan (RemoteACLResult)
} }
// newACLCache returns a new non-authoritative cache for ACLs. This is used for // newACLCache returns a new non-authoritative cache for ACLs. This is used for
@ -146,6 +150,11 @@ func newACLCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFun
return cache, nil return cache, nil
} }
type RemoteACLResult struct {
result acl.ACL
err error
}
// lookupACL is used when we are non-authoritative, and need to resolve an ACL. // lookupACL is used when we are non-authoritative, and need to resolve an ACL.
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Check the cache for the ACL. // Check the cache for the ACL.
@ -161,8 +170,22 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
return cached.ACL, nil return cached.ACL, nil
} }
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1) metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)
res := c.lookupACLRemote(id, authDC, cached)
return res.result, res.err
}
// Attempt to refresh the policy from the ACL datacenter via an RPC. func (c *aclCache) fireResult(id string, theACL acl.ACL, err error) {
c.fetchMutex.Lock()
channels := c.fetchMap[id]
delete(c.fetchMap, id)
c.fetchMutex.Unlock()
for _, cx := range channels {
cx <- RemoteACLResult{theACL, err}
close(cx)
}
}
func (c *aclCache) loadACLInChan(id, authDC string, cached *aclCacheEntry) {
args := structs.ACLPolicyRequest{ args := structs.ACLPolicyRequest{
Datacenter: authDC, Datacenter: authDC,
ACL: id, ACL: id,
@ -173,13 +196,21 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
var reply structs.ACLPolicy var reply structs.ACLPolicy
err := c.rpc("ACL.GetPolicy", &args, &reply) err := c.rpc("ACL.GetPolicy", &args, &reply)
if err == nil { if err == nil {
return c.useACLPolicy(id, authDC, cached, &reply) theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
if cached != nil && theACL != nil {
cached.ACL = theACL
cached.ETag = reply.ETag
cached.Expires = time.Now().Add(c.config.ACLTTL)
}
c.fireResult(id, theACL, theError)
return
} }
// Check for not-found, which will cause us to bail immediately. For any // Check for not-found, which will cause us to bail immediately. For any
// other error we report it in the logs but can continue. // other error we report it in the logs but can continue.
if acl.IsErrNotFound(err) { if acl.IsErrNotFound(err) {
return nil, acl.ErrNotFound c.fireResult(id, nil, acl.ErrNotFound)
return
} }
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
@ -227,24 +258,58 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
reply.TTL = c.config.ACLTTL reply.TTL = c.config.ACLTTL
reply.Parent = parent reply.Parent = parent
reply.Policy = policy reply.Policy = policy
return c.useACLPolicy(id, authDC, cached, &reply) theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
if cached != nil && theACL != nil {
cached.ACL = theACL
cached.ETag = reply.ETag
cached.Expires = time.Now().Add(c.config.ACLTTL)
}
c.fireResult(id, theACL, theError)
return
} }
ACL_DOWN: ACL_DOWN:
// Unable to refresh, apply the down policy. // Unable to refresh, apply the down policy.
switch c.config.ACLDownPolicy { switch c.config.ACLDownPolicy {
case "allow": case "allow":
return acl.AllowAll(), nil c.fireResult(id, acl.AllowAll(), nil)
return
case "extend-cache": case "extend-cache":
if cached != nil { if cached != nil {
return cached.ACL, nil c.fireResult(id, cached.ACL, nil)
return
} }
fallthrough fallthrough
default: default:
return acl.DenyAll(), nil c.fireResult(id, acl.DenyAll(), nil)
return
} }
} }
func (c *aclCache) lookupACLRemote(id, authDC string, cached *aclCacheEntry) RemoteACLResult {
// Attempt to refresh the policy from the ACL datacenter via an RPC.
myChan := make(chan RemoteACLResult)
mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "extend-cache"
c.fetchMutex.Lock()
clients, ok := c.fetchMap[id]
if !ok {
clients = make([]chan RemoteACLResult, 16)
}
if mustWaitForResult {
c.fetchMap[id] = append(clients, myChan)
}
c.fetchMutex.Unlock()
if !ok {
go c.loadACLInChan(id, authDC, cached)
}
if !mustWaitForResult {
return RemoteACLResult{cached.ACL, nil}
}
res := <-myChan
return res
}
// useACLPolicy handles an ACLPolicy response // useACLPolicy handles an ACLPolicy response
func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) { func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
// Check if we can used the cached policy // Check if we can used the cached policy