Merge pull request #4303 from pierresouchay/non_blocking_acl
Only send one single ACL cache refresh across network when TTL is over
This commit is contained in:
commit
22c5951ec4
|
@ -158,7 +158,7 @@ test: other-consul dev-build vet
|
||||||
@# hide it from travis as it exceeds their log limits and causes job to be
|
@# hide it from travis as it exceeds their log limits and causes job to be
|
||||||
@# terminated (over 4MB and over 10k lines in the UI). We need to output
|
@# terminated (over 4MB and over 10k lines in the UI). We need to output
|
||||||
@# _something_ to stop them terminating us due to inactivity...
|
@# _something_ to stop them terminating us due to inactivity...
|
||||||
{ go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 5m $(GOTEST_PKGS) 2>&1 ; echo $$? > exit-code ; } | tee test.log | egrep '^(ok|FAIL)\s*github.com/hashicorp/consul'
|
{ go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m $(GOTEST_PKGS) 2>&1 ; echo $$? > exit-code ; } | tee test.log | egrep '^(ok|FAIL)\s*github.com/hashicorp/consul'
|
||||||
@echo "Exit code: $$(cat exit-code)" >> test.log
|
@echo "Exit code: $$(cat exit-code)" >> test.log
|
||||||
@# This prints all the race report between ====== lines
|
@# This prints all the race report between ====== lines
|
||||||
@awk '/^WARNING: DATA RACE/ {do_print=1; print "=================="} do_print==1 {print} /^={10,}/ {do_print=0}' test.log || true
|
@awk '/^WARNING: DATA RACE/ {do_print=1; print "=================="} do_print==1 {print} /^={10,}/ {do_print=0}' test.log || true
|
||||||
|
|
|
@ -103,7 +103,7 @@ func newACLManager(config *config.RuntimeConfig) (*aclManager, error) {
|
||||||
down = acl.AllowAll()
|
down = acl.AllowAll()
|
||||||
case "deny":
|
case "deny":
|
||||||
down = acl.DenyAll()
|
down = acl.DenyAll()
|
||||||
case "extend-cache":
|
case "async-cache", "extend-cache":
|
||||||
// Leave the down policy as nil to signal this.
|
// Leave the down policy as nil to signal this.
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("invalid ACL down policy %q", config.ACLDownPolicy)
|
return nil, fmt.Errorf("invalid ACL down policy %q", config.ACLDownPolicy)
|
||||||
|
|
|
@ -274,79 +274,82 @@ func TestACL_Down_Allow(t *testing.T) {
|
||||||
|
|
||||||
func TestACL_Down_Extend(t *testing.T) {
|
func TestACL_Down_Extend(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a := NewTestAgent(t.Name(), TestACLConfig()+`
|
aclExtendPolicies := []string{"extend-cache", "async-cache"}
|
||||||
acl_down_policy = "extend-cache"
|
for _, aclDownPolicy := range aclExtendPolicies {
|
||||||
acl_enforce_version_8 = true
|
a := NewTestAgent(t.Name(), TestACLConfig()+`
|
||||||
`)
|
acl_down_policy = "`+aclDownPolicy+`"
|
||||||
defer a.Shutdown()
|
acl_enforce_version_8 = true
|
||||||
|
`)
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
m := MockServer{
|
m := MockServer{
|
||||||
// Populate the cache for one of the tokens.
|
// Populate the cache for one of the tokens.
|
||||||
getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error {
|
getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error {
|
||||||
*reply = structs.ACLPolicy{
|
*reply = structs.ACLPolicy{
|
||||||
Parent: "allow",
|
Parent: "allow",
|
||||||
Policy: &rawacl.Policy{
|
Policy: &rawacl.Policy{
|
||||||
Agents: []*rawacl.AgentPolicy{
|
Agents: []*rawacl.AgentPolicy{
|
||||||
&rawacl.AgentPolicy{
|
&rawacl.AgentPolicy{
|
||||||
Node: a.config.NodeName,
|
Node: a.config.NodeName,
|
||||||
Policy: "read",
|
Policy: "read",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
}
|
return nil
|
||||||
return nil
|
},
|
||||||
},
|
}
|
||||||
}
|
if err := a.registerEndpoint("ACL", &m); err != nil {
|
||||||
if err := a.registerEndpoint("ACL", &m); err != nil {
|
t.Fatalf("err: %v", err)
|
||||||
t.Fatalf("err: %v", err)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
acl, err := a.resolveToken("yep")
|
acl, err := a.resolveToken("yep")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if acl == nil {
|
if acl == nil {
|
||||||
t.Fatalf("should not be nil")
|
t.Fatalf("should not be nil")
|
||||||
}
|
}
|
||||||
if !acl.AgentRead(a.config.NodeName) {
|
if !acl.AgentRead(a.config.NodeName) {
|
||||||
t.Fatalf("should allow")
|
t.Fatalf("should allow")
|
||||||
}
|
}
|
||||||
if acl.AgentWrite(a.config.NodeName) {
|
if acl.AgentWrite(a.config.NodeName) {
|
||||||
t.Fatalf("should deny")
|
t.Fatalf("should deny")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now take down ACLs and make sure a new token fails to resolve.
|
// Now take down ACLs and make sure a new token fails to resolve.
|
||||||
m.getPolicyFn = func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error {
|
m.getPolicyFn = func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error {
|
||||||
return fmt.Errorf("ACLs are broken")
|
return fmt.Errorf("ACLs are broken")
|
||||||
}
|
}
|
||||||
acl, err = a.resolveToken("nope")
|
acl, err = a.resolveToken("nope")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if acl == nil {
|
if acl == nil {
|
||||||
t.Fatalf("should not be nil")
|
t.Fatalf("should not be nil")
|
||||||
}
|
}
|
||||||
if acl.AgentRead(a.config.NodeName) {
|
if acl.AgentRead(a.config.NodeName) {
|
||||||
t.Fatalf("should deny")
|
t.Fatalf("should deny")
|
||||||
}
|
}
|
||||||
if acl.AgentWrite(a.config.NodeName) {
|
if acl.AgentWrite(a.config.NodeName) {
|
||||||
t.Fatalf("should deny")
|
t.Fatalf("should deny")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the token from the cache while ACLs are broken, which should
|
// Read the token from the cache while ACLs are broken, which should
|
||||||
// extend.
|
// extend.
|
||||||
acl, err = a.resolveToken("yep")
|
acl, err = a.resolveToken("yep")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if acl == nil {
|
if acl == nil {
|
||||||
t.Fatalf("should not be nil")
|
t.Fatalf("should not be nil")
|
||||||
}
|
}
|
||||||
if !acl.AgentRead(a.config.NodeName) {
|
if !acl.AgentRead(a.config.NodeName) {
|
||||||
t.Fatalf("should allow")
|
t.Fatalf("should allow")
|
||||||
}
|
}
|
||||||
if acl.AgentWrite(a.config.NodeName) {
|
if acl.AgentWrite(a.config.NodeName) {
|
||||||
t.Fatalf("should deny")
|
t.Fatalf("should deny")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,8 +94,10 @@ type RuntimeConfig struct {
|
||||||
// ACL's to be used to service requests. This
|
// ACL's to be used to service requests. This
|
||||||
// is the default. If the ACL is not in the cache,
|
// is the default. If the ACL is not in the cache,
|
||||||
// this acts like deny.
|
// this acts like deny.
|
||||||
|
// * async-cache - Same behaviour as extend-cache, but perform ACL
|
||||||
|
// Lookups asynchronously when cache TTL is expired.
|
||||||
//
|
//
|
||||||
// hcl: acl_down_policy = ("allow"|"deny"|"extend-cache")
|
// hcl: acl_down_policy = ("allow"|"deny"|"extend-cache"|"async-cache")
|
||||||
ACLDownPolicy string
|
ACLDownPolicy string
|
||||||
|
|
||||||
// ACLEnforceVersion8 is used to gate a set of ACL policy features that
|
// ACLEnforceVersion8 is used to gate a set of ACL policy features that
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -116,6 +117,9 @@ type aclCache struct {
|
||||||
// local is a function used to look for an ACL locally if replication is
|
// local is a function used to look for an ACL locally if replication is
|
||||||
// enabled. This will be nil if replication isn't enabled.
|
// enabled. This will be nil if replication isn't enabled.
|
||||||
local acl.FaultFunc
|
local acl.FaultFunc
|
||||||
|
|
||||||
|
fetchMutex sync.RWMutex
|
||||||
|
fetchMap map[string][]chan (RemoteACLResult)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newACLCache returns a new non-authoritative cache for ACLs. This is used for
|
// newACLCache returns a new non-authoritative cache for ACLs. This is used for
|
||||||
|
@ -142,10 +146,17 @@ func newACLCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFun
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
|
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
|
||||||
}
|
}
|
||||||
|
cache.fetchMap = make(map[string][]chan (RemoteACLResult))
|
||||||
|
|
||||||
return cache, nil
|
return cache, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Result Type returned when fetching Remote ACLs asynchronously
|
||||||
|
type RemoteACLResult struct {
|
||||||
|
result acl.ACL
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
// lookupACL is used when we are non-authoritative, and need to resolve an ACL.
|
// lookupACL is used when we are non-authoritative, and need to resolve an ACL.
|
||||||
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||||
// Check the cache for the ACL.
|
// Check the cache for the ACL.
|
||||||
|
@ -161,8 +172,23 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||||
return cached.ACL, nil
|
return cached.ACL, nil
|
||||||
}
|
}
|
||||||
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)
|
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)
|
||||||
|
res := c.lookupACLRemote(id, authDC, cached)
|
||||||
|
return res.result, res.err
|
||||||
|
}
|
||||||
|
|
||||||
// Attempt to refresh the policy from the ACL datacenter via an RPC.
|
func (c *aclCache) fireResult(id string, theACL acl.ACL, err error) {
|
||||||
|
c.fetchMutex.Lock()
|
||||||
|
channels := c.fetchMap[id]
|
||||||
|
delete(c.fetchMap, id)
|
||||||
|
c.fetchMutex.Unlock()
|
||||||
|
aclResult := RemoteACLResult{theACL, err}
|
||||||
|
for _, cx := range channels {
|
||||||
|
cx <- aclResult
|
||||||
|
close(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *aclCache) loadACLInChan(id, authDC string, cached *aclCacheEntry) {
|
||||||
args := structs.ACLPolicyRequest{
|
args := structs.ACLPolicyRequest{
|
||||||
Datacenter: authDC,
|
Datacenter: authDC,
|
||||||
ACL: id,
|
ACL: id,
|
||||||
|
@ -173,13 +199,21 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||||
var reply structs.ACLPolicy
|
var reply structs.ACLPolicy
|
||||||
err := c.rpc("ACL.GetPolicy", &args, &reply)
|
err := c.rpc("ACL.GetPolicy", &args, &reply)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return c.useACLPolicy(id, authDC, cached, &reply)
|
theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
|
||||||
|
if cached != nil && theACL != nil {
|
||||||
|
cached.ACL = theACL
|
||||||
|
cached.ETag = reply.ETag
|
||||||
|
cached.Expires = time.Now().Add(c.config.ACLTTL)
|
||||||
|
}
|
||||||
|
c.fireResult(id, theACL, theError)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for not-found, which will cause us to bail immediately. For any
|
// Check for not-found, which will cause us to bail immediately. For any
|
||||||
// other error we report it in the logs but can continue.
|
// other error we report it in the logs but can continue.
|
||||||
if acl.IsErrNotFound(err) {
|
if acl.IsErrNotFound(err) {
|
||||||
return nil, acl.ErrNotFound
|
c.fireResult(id, nil, acl.ErrNotFound)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
|
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
|
||||||
|
|
||||||
|
@ -200,7 +234,7 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||||
// local ACL fault function is registered to query replicated ACL data,
|
// local ACL fault function is registered to query replicated ACL data,
|
||||||
// and the user's policy allows it, we will try locally before we give
|
// and the user's policy allows it, we will try locally before we give
|
||||||
// up.
|
// up.
|
||||||
if c.local != nil && c.config.ACLDownPolicy == "extend-cache" {
|
if c.local != nil && (c.config.ACLDownPolicy == "extend-cache" || c.config.ACLDownPolicy == "async-cache") {
|
||||||
parent, rules, err := c.local(id)
|
parent, rules, err := c.local(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// We don't make an exception here for ACLs that aren't
|
// We don't make an exception here for ACLs that aren't
|
||||||
|
@ -227,24 +261,58 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||||
reply.TTL = c.config.ACLTTL
|
reply.TTL = c.config.ACLTTL
|
||||||
reply.Parent = parent
|
reply.Parent = parent
|
||||||
reply.Policy = policy
|
reply.Policy = policy
|
||||||
return c.useACLPolicy(id, authDC, cached, &reply)
|
theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
|
||||||
|
if cached != nil && theACL != nil {
|
||||||
|
cached.ACL = theACL
|
||||||
|
cached.ETag = reply.ETag
|
||||||
|
cached.Expires = time.Now().Add(c.config.ACLTTL)
|
||||||
|
}
|
||||||
|
c.fireResult(id, theACL, theError)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ACL_DOWN:
|
ACL_DOWN:
|
||||||
// Unable to refresh, apply the down policy.
|
// Unable to refresh, apply the down policy.
|
||||||
switch c.config.ACLDownPolicy {
|
switch c.config.ACLDownPolicy {
|
||||||
case "allow":
|
case "allow":
|
||||||
return acl.AllowAll(), nil
|
c.fireResult(id, acl.AllowAll(), nil)
|
||||||
case "extend-cache":
|
return
|
||||||
|
case "async-cache", "extend-cache":
|
||||||
if cached != nil {
|
if cached != nil {
|
||||||
return cached.ACL, nil
|
c.fireResult(id, cached.ACL, nil)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
fallthrough
|
fallthrough
|
||||||
default:
|
default:
|
||||||
return acl.DenyAll(), nil
|
c.fireResult(id, acl.DenyAll(), nil)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *aclCache) lookupACLRemote(id, authDC string, cached *aclCacheEntry) RemoteACLResult {
|
||||||
|
// Attempt to refresh the policy from the ACL datacenter via an RPC.
|
||||||
|
myChan := make(chan RemoteACLResult)
|
||||||
|
mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "async-cache"
|
||||||
|
c.fetchMutex.Lock()
|
||||||
|
clients, ok := c.fetchMap[id]
|
||||||
|
if !ok || clients == nil {
|
||||||
|
clients = make([]chan RemoteACLResult, 0)
|
||||||
|
}
|
||||||
|
if mustWaitForResult {
|
||||||
|
c.fetchMap[id] = append(clients, myChan)
|
||||||
|
}
|
||||||
|
c.fetchMutex.Unlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
go c.loadACLInChan(id, authDC, cached)
|
||||||
|
}
|
||||||
|
if !mustWaitForResult {
|
||||||
|
return RemoteACLResult{cached.ACL, nil}
|
||||||
|
}
|
||||||
|
res := <-myChan
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
// useACLPolicy handles an ACLPolicy response
|
// useACLPolicy handles an ACLPolicy response
|
||||||
func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
|
func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
|
||||||
// Check if we can used the cached policy
|
// Check if we can used the cached policy
|
||||||
|
|
|
@ -508,193 +508,201 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
|
||||||
|
|
||||||
func TestACL_DownPolicy_ExtendCache(t *testing.T) {
|
func TestACL_DownPolicy_ExtendCache(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
aclExtendPolicies := []string{"extend-cache", "async-cache"} //"async-cache"
|
||||||
c.ACLDatacenter = "dc1"
|
|
||||||
c.ACLTTL = 0
|
|
||||||
c.ACLDownPolicy = "extend-cache"
|
|
||||||
c.ACLMasterToken = "root"
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
client := rpcClient(t, s1)
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
for _, aclDownPolicy := range aclExtendPolicies {
|
||||||
c.ACLDatacenter = "dc1" // Enable ACLs!
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
c.ACLTTL = 0
|
c.ACLDatacenter = "dc1"
|
||||||
c.ACLDownPolicy = "extend-cache"
|
c.ACLTTL = 0
|
||||||
c.Bootstrap = false // Disable bootstrap
|
c.ACLDownPolicy = aclDownPolicy
|
||||||
})
|
c.ACLMasterToken = "root"
|
||||||
defer os.RemoveAll(dir2)
|
})
|
||||||
defer s2.Shutdown()
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
// Try to join
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||||
joinLAN(t, s2, s1)
|
c.ACLDatacenter = "dc1" // Enable ACLs!
|
||||||
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
|
c.ACLTTL = 0
|
||||||
|
c.ACLDownPolicy = aclDownPolicy
|
||||||
|
c.Bootstrap = false // Disable bootstrap
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
// Try to join
|
||||||
|
joinLAN(t, s2, s1)
|
||||||
|
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
|
||||||
|
|
||||||
// Create a new token
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
arg := structs.ACLRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
Name: "User token",
|
|
||||||
Type: structs.ACLTypeClient,
|
|
||||||
Rules: testACLPolicy,
|
|
||||||
},
|
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
||||||
}
|
|
||||||
var id string
|
|
||||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// find the non-authoritative server
|
// Create a new token
|
||||||
var nonAuth *Server
|
arg := structs.ACLRequest{
|
||||||
var auth *Server
|
Datacenter: "dc1",
|
||||||
if !s1.IsLeader() {
|
Op: structs.ACLSet,
|
||||||
nonAuth = s1
|
ACL: structs.ACL{
|
||||||
auth = s2
|
Name: "User token",
|
||||||
} else {
|
Type: structs.ACLTypeClient,
|
||||||
nonAuth = s2
|
Rules: testACLPolicy,
|
||||||
auth = s1
|
},
|
||||||
}
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
var id string
|
||||||
|
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Warm the caches
|
// find the non-authoritative server
|
||||||
aclR, err := nonAuth.resolveToken(id)
|
var nonAuth *Server
|
||||||
if err != nil {
|
var auth *Server
|
||||||
t.Fatalf("err: %v", err)
|
if !s1.IsLeader() {
|
||||||
}
|
nonAuth = s1
|
||||||
if aclR == nil {
|
auth = s2
|
||||||
t.Fatalf("bad acl: %#v", aclR)
|
} else {
|
||||||
}
|
nonAuth = s2
|
||||||
|
auth = s1
|
||||||
|
}
|
||||||
|
|
||||||
// Kill the authoritative server
|
// Warm the caches
|
||||||
auth.Shutdown()
|
aclR, err := nonAuth.resolveToken(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if aclR == nil {
|
||||||
|
t.Fatalf("bad acl: %#v", aclR)
|
||||||
|
}
|
||||||
|
|
||||||
// Token should resolve into cached copy
|
// Kill the authoritative server
|
||||||
aclR2, err := nonAuth.resolveToken(id)
|
auth.Shutdown()
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
// Token should resolve into cached copy
|
||||||
}
|
aclR2, err := nonAuth.resolveToken(id)
|
||||||
if aclR2 != aclR {
|
if err != nil {
|
||||||
t.Fatalf("bad acl: %#v", aclR)
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if aclR2 != aclR {
|
||||||
|
t.Fatalf("bad acl: %#v", aclR)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestACL_Replication(t *testing.T) {
|
func TestACL_Replication(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
aclExtendPolicies := []string{"extend-cache", "async-cache"} //"async-cache"
|
||||||
c.ACLDatacenter = "dc1"
|
|
||||||
c.ACLMasterToken = "root"
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
client := rpcClient(t, s1)
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
for _, aclDownPolicy := range aclExtendPolicies {
|
||||||
c.Datacenter = "dc2"
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
c.ACLDatacenter = "dc1"
|
c.ACLDatacenter = "dc1"
|
||||||
c.ACLDefaultPolicy = "deny"
|
c.ACLMasterToken = "root"
|
||||||
c.ACLDownPolicy = "extend-cache"
|
})
|
||||||
c.EnableACLReplication = true
|
defer os.RemoveAll(dir1)
|
||||||
c.ACLReplicationInterval = 10 * time.Millisecond
|
defer s1.Shutdown()
|
||||||
c.ACLReplicationApplyLimit = 1000000
|
client := rpcClient(t, s1)
|
||||||
})
|
defer client.Close()
|
||||||
s2.tokens.UpdateACLReplicationToken("root")
|
|
||||||
defer os.RemoveAll(dir2)
|
|
||||||
defer s2.Shutdown()
|
|
||||||
|
|
||||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||||
c.Datacenter = "dc3"
|
c.Datacenter = "dc2"
|
||||||
c.ACLDatacenter = "dc1"
|
c.ACLDatacenter = "dc1"
|
||||||
c.ACLDownPolicy = "deny"
|
c.ACLDefaultPolicy = "deny"
|
||||||
c.EnableACLReplication = true
|
c.ACLDownPolicy = aclDownPolicy
|
||||||
c.ACLReplicationInterval = 10 * time.Millisecond
|
c.EnableACLReplication = true
|
||||||
c.ACLReplicationApplyLimit = 1000000
|
c.ACLReplicationInterval = 10 * time.Millisecond
|
||||||
})
|
c.ACLReplicationApplyLimit = 1000000
|
||||||
s3.tokens.UpdateACLReplicationToken("root")
|
})
|
||||||
defer os.RemoveAll(dir3)
|
s2.tokens.UpdateACLReplicationToken("root")
|
||||||
defer s3.Shutdown()
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
|
||||||
// Try to join.
|
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||||
joinWAN(t, s2, s1)
|
c.Datacenter = "dc3"
|
||||||
joinWAN(t, s3, s1)
|
c.ACLDatacenter = "dc1"
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
c.ACLDownPolicy = "deny"
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
c.EnableACLReplication = true
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc3")
|
c.ACLReplicationInterval = 10 * time.Millisecond
|
||||||
|
c.ACLReplicationApplyLimit = 1000000
|
||||||
|
})
|
||||||
|
s3.tokens.UpdateACLReplicationToken("root")
|
||||||
|
defer os.RemoveAll(dir3)
|
||||||
|
defer s3.Shutdown()
|
||||||
|
|
||||||
// Create a new token.
|
// Try to join.
|
||||||
arg := structs.ACLRequest{
|
joinWAN(t, s2, s1)
|
||||||
Datacenter: "dc1",
|
joinWAN(t, s3, s1)
|
||||||
Op: structs.ACLSet,
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
ACL: structs.ACL{
|
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||||
Name: "User token",
|
testrpc.WaitForLeader(t, s1.RPC, "dc3")
|
||||||
Type: structs.ACLTypeClient,
|
|
||||||
Rules: testACLPolicy,
|
// Create a new token.
|
||||||
},
|
arg := structs.ACLRequest{
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
Datacenter: "dc1",
|
||||||
}
|
Op: structs.ACLSet,
|
||||||
var id string
|
ACL: structs.ACL{
|
||||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
Name: "User token",
|
||||||
t.Fatalf("err: %v", err)
|
Type: structs.ACLTypeClient,
|
||||||
}
|
Rules: testACLPolicy,
|
||||||
// Wait for replication to occur.
|
},
|
||||||
retry.Run(t, func(r *retry.R) {
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
_, acl, err := s2.fsm.State().ACLGet(nil, id)
|
}
|
||||||
|
var id string
|
||||||
|
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
// Wait for replication to occur.
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, acl, err := s2.fsm.State().ACLGet(nil, id)
|
||||||
|
if err != nil {
|
||||||
|
r.Fatal(err)
|
||||||
|
}
|
||||||
|
if acl == nil {
|
||||||
|
r.Fatal(nil)
|
||||||
|
}
|
||||||
|
_, acl, err = s3.fsm.State().ACLGet(nil, id)
|
||||||
|
if err != nil {
|
||||||
|
r.Fatal(err)
|
||||||
|
}
|
||||||
|
if acl == nil {
|
||||||
|
r.Fatal(nil)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Kill the ACL datacenter.
|
||||||
|
s1.Shutdown()
|
||||||
|
|
||||||
|
// Token should resolve on s2, which has replication + extend-cache.
|
||||||
|
acl, err := s2.resolveToken(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Fatal(err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if acl == nil {
|
if acl == nil {
|
||||||
r.Fatal(nil)
|
t.Fatalf("missing acl")
|
||||||
}
|
}
|
||||||
_, acl, err = s3.fsm.State().ACLGet(nil, id)
|
|
||||||
|
// Check the policy
|
||||||
|
if acl.KeyRead("bar") {
|
||||||
|
t.Fatalf("unexpected read")
|
||||||
|
}
|
||||||
|
if !acl.KeyRead("foo/test") {
|
||||||
|
t.Fatalf("unexpected failed read")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Although s3 has replication, and we verified that the ACL is there,
|
||||||
|
// it can not be used because of the down policy.
|
||||||
|
acl, err = s3.resolveToken(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Fatal(err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if acl == nil {
|
if acl == nil {
|
||||||
r.Fatal(nil)
|
t.Fatalf("missing acl")
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
|
||||||
// Kill the ACL datacenter.
|
// Check the policy.
|
||||||
s1.Shutdown()
|
if acl.KeyRead("bar") {
|
||||||
|
t.Fatalf("unexpected read")
|
||||||
// Token should resolve on s2, which has replication + extend-cache.
|
}
|
||||||
acl, err := s2.resolveToken(id)
|
if acl.KeyRead("foo/test") {
|
||||||
if err != nil {
|
t.Fatalf("unexpected read")
|
||||||
t.Fatalf("err: %v", err)
|
}
|
||||||
}
|
|
||||||
if acl == nil {
|
|
||||||
t.Fatalf("missing acl")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the policy
|
|
||||||
if acl.KeyRead("bar") {
|
|
||||||
t.Fatalf("unexpected read")
|
|
||||||
}
|
|
||||||
if !acl.KeyRead("foo/test") {
|
|
||||||
t.Fatalf("unexpected failed read")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Although s3 has replication, and we verified that the ACL is there,
|
|
||||||
// it can not be used because of the down policy.
|
|
||||||
acl, err = s3.resolveToken(id)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if acl == nil {
|
|
||||||
t.Fatalf("missing acl")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the policy.
|
|
||||||
if acl.KeyRead("bar") {
|
|
||||||
t.Fatalf("unexpected read")
|
|
||||||
}
|
|
||||||
if acl.KeyRead("foo/test") {
|
|
||||||
t.Fatalf("unexpected read")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -235,8 +235,9 @@ type Config struct {
|
||||||
|
|
||||||
// ACLDownPolicy controls the behavior of ACLs if the ACLDatacenter
|
// ACLDownPolicy controls the behavior of ACLs if the ACLDatacenter
|
||||||
// cannot be contacted. It can be either "deny" to deny all requests,
|
// cannot be contacted. It can be either "deny" to deny all requests,
|
||||||
// or "extend-cache" which ignores the ACLCacheInterval and uses
|
// "extend-cache" or "async-cache" which ignores the ACLCacheInterval and
|
||||||
// cached policies. If a policy is not in the cache, it acts like deny.
|
// uses cached policies.
|
||||||
|
// If a policy is not in the cache, it acts like deny.
|
||||||
// "allow" can be used to allow all requests. This is not recommended.
|
// "allow" can be used to allow all requests. This is not recommended.
|
||||||
ACLDownPolicy string
|
ACLDownPolicy string
|
||||||
|
|
||||||
|
@ -378,7 +379,7 @@ func (c *Config) CheckACL() error {
|
||||||
switch c.ACLDownPolicy {
|
switch c.ACLDownPolicy {
|
||||||
case "allow":
|
case "allow":
|
||||||
case "deny":
|
case "deny":
|
||||||
case "extend-cache":
|
case "async-cache", "extend-cache":
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unsupported down ACL policy: %s", c.ACLDownPolicy)
|
return fmt.Errorf("Unsupported down ACL policy: %s", c.ACLDownPolicy)
|
||||||
}
|
}
|
||||||
|
|
|
@ -496,11 +496,14 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
||||||
to enable ACL support.
|
to enable ACL support.
|
||||||
|
|
||||||
* <a name="acl_down_policy"></a><a href="#acl_down_policy">`acl_down_policy`</a> - Either
|
* <a name="acl_down_policy"></a><a href="#acl_down_policy">`acl_down_policy`</a> - Either
|
||||||
"allow", "deny" or "extend-cache"; "extend-cache" is the default. In the case that the
|
"allow", "deny", "extend-cache" or "async-cache"; "extend-cache" is the default. In the case that the
|
||||||
policy for a token cannot be read from the [`acl_datacenter`](#acl_datacenter) or leader
|
policy for a token cannot be read from the [`acl_datacenter`](#acl_datacenter) or leader
|
||||||
node, the down policy is applied. In "allow" mode, all actions are permitted, "deny" restricts
|
node, the down policy is applied. In "allow" mode, all actions are permitted, "deny" restricts
|
||||||
all operations, and "extend-cache" allows any cached ACLs to be used, ignoring their TTL
|
all operations, and "extend-cache" allows any cached ACLs to be used, ignoring their TTL
|
||||||
values. If a non-cached ACL is used, "extend-cache" acts like "deny".
|
values. If a non-cached ACL is used, "extend-cache" acts like "deny".
|
||||||
|
The value "async-cache" acts the same way as "extend-cache" but performs updates
|
||||||
|
asynchronously when ACL is present but its TTL is expired, thus, if latency is bad between
|
||||||
|
ACL authoritative and other datacenters, latency of operations is not impacted.
|
||||||
|
|
||||||
* <a name="acl_agent_master_token"></a><a href="#acl_agent_master_token">`acl_agent_master_token`</a> -
|
* <a name="acl_agent_master_token"></a><a href="#acl_agent_master_token">`acl_agent_master_token`</a> -
|
||||||
Used to access <a href="/api/agent.html">agent endpoints</a> that require agent read
|
Used to access <a href="/api/agent.html">agent endpoints</a> that require agent read
|
||||||
|
|
|
@ -1061,6 +1061,11 @@ and the [`acl_down_policy`](/docs/agent/options.html#acl_down_policy)
|
||||||
is set to "extend-cache", tokens will be resolved during the outage using the
|
is set to "extend-cache", tokens will be resolved during the outage using the
|
||||||
replicated set of ACLs. An [ACL replication status](/api/acl.html#acl_replication_status)
|
replicated set of ACLs. An [ACL replication status](/api/acl.html#acl_replication_status)
|
||||||
endpoint is available to monitor the health of the replication process.
|
endpoint is available to monitor the health of the replication process.
|
||||||
|
Also note that in recent versions of Consul (greater than 1.2.0), using
|
||||||
|
`acl_down_policy = "async-cache"` refreshes token asynchronously when an ACL is
|
||||||
|
already cached and is expired while similar semantics than "extend-cache".
|
||||||
|
It allows to avoid having issues when connectivity with the authoritative is not completely
|
||||||
|
broken, but very slow.
|
||||||
|
|
||||||
Locally-resolved ACLs will be cached using the [`acl_ttl`](/docs/agent/options.html#acl_ttl)
|
Locally-resolved ACLs will be cached using the [`acl_ttl`](/docs/agent/options.html#acl_ttl)
|
||||||
setting of the non-authoritative datacenter, so these entries may persist in the
|
setting of the non-authoritative datacenter, so these entries may persist in the
|
||||||
|
|
Loading…
Reference in New Issue