From da9c91fd3df6d88f778c84e1bffa58b80750d7f6 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Thu, 28 Jun 2018 09:06:14 +0200
Subject: [PATCH] Only send one single ACL cache refresh across network when
TTL is over
It will allow the following:
* when connectivity is limited (saturated linnks between DCs), only one
single request to refresh an ACL will be sent to ACL master DC instead
of statcking ACL refresh queries
* when extend-cache is used for ACL, do not wait for result, but refresh
the ACL asynchronously, so no delay is not impacting slave DC
* When extend-cache is not used, keep the existing blocking mechanism,
but only send a single refresh request.
This will fix https://github.com/hashicorp/consul/issues/3524
---
agent/consul/acl.go | 79 +++++++++++++++++++++++++++++++++++++++++----
1 file changed, 72 insertions(+), 7 deletions(-)
diff --git a/agent/consul/acl.go b/agent/consul/acl.go
index ce3282b40..0bf1dbb4c 100644
--- a/agent/consul/acl.go
+++ b/agent/consul/acl.go
@@ -4,6 +4,7 @@ import (
"fmt"
"log"
"os"
+ "sync"
"time"
"github.com/armon/go-metrics"
@@ -116,6 +117,9 @@ type aclCache struct {
// local is a function used to look for an ACL locally if replication is
// enabled. This will be nil if replication isn't enabled.
local acl.FaultFunc
+
+ fetchMutex sync.RWMutex
+ fetchMap map[string][]chan (RemoteACLResult)
}
// newACLCache returns a new non-authoritative cache for ACLs. This is used for
@@ -146,6 +150,11 @@ func newACLCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFun
return cache, nil
}
+type RemoteACLResult struct {
+ result acl.ACL
+ err error
+}
+
// lookupACL is used when we are non-authoritative, and need to resolve an ACL.
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Check the cache for the ACL.
@@ -161,8 +170,22 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
return cached.ACL, nil
}
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)
+ res := c.lookupACLRemote(id, authDC, cached)
+ return res.result, res.err
+}
- // Attempt to refresh the policy from the ACL datacenter via an RPC.
+func (c *aclCache) fireResult(id string, theACL acl.ACL, err error) {
+ c.fetchMutex.Lock()
+ channels := c.fetchMap[id]
+ delete(c.fetchMap, id)
+ c.fetchMutex.Unlock()
+ for _, cx := range channels {
+ cx <- RemoteACLResult{theACL, err}
+ close(cx)
+ }
+}
+
+func (c *aclCache) loadACLInChan(id, authDC string, cached *aclCacheEntry) {
args := structs.ACLPolicyRequest{
Datacenter: authDC,
ACL: id,
@@ -173,13 +196,21 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
var reply structs.ACLPolicy
err := c.rpc("ACL.GetPolicy", &args, &reply)
if err == nil {
- return c.useACLPolicy(id, authDC, cached, &reply)
+ theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
+ if cached != nil && theACL != nil {
+ cached.ACL = theACL
+ cached.ETag = reply.ETag
+ cached.Expires = time.Now().Add(c.config.ACLTTL)
+ }
+ c.fireResult(id, theACL, theError)
+ return
}
// Check for not-found, which will cause us to bail immediately. For any
// other error we report it in the logs but can continue.
if acl.IsErrNotFound(err) {
- return nil, acl.ErrNotFound
+ c.fireResult(id, nil, acl.ErrNotFound)
+ return
}
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
@@ -227,24 +258,58 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
reply.TTL = c.config.ACLTTL
reply.Parent = parent
reply.Policy = policy
- return c.useACLPolicy(id, authDC, cached, &reply)
+ theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
+ if cached != nil && theACL != nil {
+ cached.ACL = theACL
+ cached.ETag = reply.ETag
+ cached.Expires = time.Now().Add(c.config.ACLTTL)
+ }
+ c.fireResult(id, theACL, theError)
+ return
}
ACL_DOWN:
// Unable to refresh, apply the down policy.
switch c.config.ACLDownPolicy {
case "allow":
- return acl.AllowAll(), nil
+ c.fireResult(id, acl.AllowAll(), nil)
+ return
case "extend-cache":
if cached != nil {
- return cached.ACL, nil
+ c.fireResult(id, cached.ACL, nil)
+ return
}
fallthrough
default:
- return acl.DenyAll(), nil
+ c.fireResult(id, acl.DenyAll(), nil)
+ return
}
}
+func (c *aclCache) lookupACLRemote(id, authDC string, cached *aclCacheEntry) RemoteACLResult {
+ // Attempt to refresh the policy from the ACL datacenter via an RPC.
+ myChan := make(chan RemoteACLResult)
+ mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "extend-cache"
+ c.fetchMutex.Lock()
+ clients, ok := c.fetchMap[id]
+ if !ok {
+ clients = make([]chan RemoteACLResult, 16)
+ }
+ if mustWaitForResult {
+ c.fetchMap[id] = append(clients, myChan)
+ }
+ c.fetchMutex.Unlock()
+
+ if !ok {
+ go c.loadACLInChan(id, authDC, cached)
+ }
+ if !mustWaitForResult {
+ return RemoteACLResult{cached.ACL, nil}
+ }
+ res := <-myChan
+ return res
+}
+
// useACLPolicy handles an ACLPolicy response
func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
// Check if we can used the cached policy