consul: Pulling in ACLs

This commit is contained in:
Armon Dadgar 2014-08-08 15:32:43 -07:00
parent dcd4508ca9
commit bd124a8da3
5 changed files with 186 additions and 3 deletions

79
consul/acl.go Normal file
View File

@ -0,0 +1,79 @@
package consul
import (
"fmt"
"time"
"github.com/hashicorp/consul/acl"
)
// aclCacheEntry is used to cache non-authoritative ACL's
// If non-authoritative, then we must respect a TTL
type aclCacheEntry struct {
ACL acl.ACL
TTL time.Duration
Expires time.Time
}
// aclFault is used to fault in the rules for an ACL if we take a miss
func (s *Server) aclFault(id string) (string, error) {
state := s.fsm.State()
_, acl, err := state.ACLGet(id)
if err != nil {
return "", err
}
if acl == nil {
return "", fmt.Errorf("ACL not found: %s", id)
}
return acl.Rules, nil
}
// resolveToken is used to resolve an ACL is any is appropriate
func (s *Server) resolveToken(id string) (acl.ACL, error) {
// Check if there is no ACL datacenter (ACL's disabled)
authDC := s.config.ACLDatacenter
if authDC == "" {
return nil, nil
}
// Check if we are the ACL datacenter and the leader, use the
// authoritative cache
if s.config.Datacenter == authDC && s.IsLeader() {
return s.aclAuthCache.GetACL(id)
}
// Use our non-authoritative cache
return s.lookupACL(id)
}
// lookupACL is used when we are non-authoritative, and need
// to resolve an ACL
func (s *Server) lookupACL(id string) (acl.ACL, error) {
// Check the cache for the ACL
var cached *aclCacheEntry
raw, ok := s.aclCache.Get(id)
if ok {
cached = raw.(*aclCacheEntry)
}
// Check for live cache
if cached != nil && time.Now().Before(cached.Expires) {
return cached.ACL, nil
}
// Attempt to refresh the policy
// TODO: GetPolicy...
// Unable to refresh, apply the down policy
switch s.config.ACLDownPolicy {
case "allow":
return acl.AllowAll(), nil
case "extend-cache":
if cached != nil {
return cached.ACL, nil
}
fallthrough
default:
return acl.DenyAll(), nil
}
}

View File

@ -81,6 +81,26 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
}) })
} }
// GetPolicy is used to retrieve a compiled policy object with a TTL. Does not
// support a blocking query.
func (a *ACL) GetPolicy(args *structs.ACLSpecificRequest, reply *structs.ACLPolicy) error {
if done, err := a.srv.forward("ACL.GetPolicy", args, args, reply); done {
return err
}
// Get the policy via the cache
policy, err := a.srv.aclAuthCache.GetACLPolicy(args.ACL)
if err != nil {
return err
}
// Setup the response
reply.Policy = policy
reply.TTL = a.srv.config.ACLTTL
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// List is used to list all the ACLs // List is used to list all the ACLs
func (a *ACL) List(args *structs.DCSpecificRequest, func (a *ACL) List(args *structs.DCSpecificRequest,
reply *structs.IndexedACLs) error { reply *structs.IndexedACLs) error {

View File

@ -1,10 +1,12 @@
package consul package consul
import ( import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os" "os"
"testing" "testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
) )
func TestACLEndpoint_Apply(t *testing.T) { func TestACLEndpoint_Apply(t *testing.T) {
@ -106,6 +108,45 @@ func TestACLEndpoint_Get(t *testing.T) {
} }
} }
func TestACLEndpoint_GetPolicy(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
getR := structs.ACLSpecificRequest{
Datacenter: "dc1",
ACL: out,
}
var acls structs.ACLPolicy
if err := client.Call("ACL.GetPolicy", &getR, &acls); err != nil {
t.Fatalf("err: %v", err)
}
if acls.Policy == nil {
t.Fatalf("Bad: %v", acls)
}
if acls.TTL != 30*time.Second {
t.Fatalf("bad: %v", acls)
}
}
func TestACLEndpoint_List(t *testing.T) { func TestACLEndpoint_List(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)

View File

@ -15,6 +15,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/golang-lru"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/raft-mdb" "github.com/hashicorp/raft-mdb"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -43,11 +45,21 @@ const (
// serverMaxStreams controsl how many idle streams we keep // serverMaxStreams controsl how many idle streams we keep
// open to a server // open to a server
serverMaxStreams = 64 serverMaxStreams = 64
// Maximum number of cached ACL entries
aclCacheSize = 256
) )
// Server is Consul server which manages the service discovery, // Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools. // health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct { type Server struct {
// aclAuthCache is the authoritative ACL cache
aclAuthCache *acl.Cache
// aclCache is a non-authoritative ACL cache
aclCache *lru.Cache
// Consul configuration
config *Config config *Config
// Connection pool to other consul servers // Connection pool to other consul servers
@ -181,6 +193,29 @@ func NewServer(config *Config) (*Server, error) {
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
// Determine the ACL root policy
var aclRoot acl.ACL
switch config.ACLDefaultPolicy {
case "allow":
aclRoot = acl.AllowAll()
case "deny":
aclRoot = acl.DenyAll()
}
// Initialize the authoritative ACL cache
s.aclAuthCache, err = acl.NewCache(aclCacheSize, aclRoot, s.aclFault)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
}
// Initialize the non-authoritative ACL cache
s.aclCache, err = lru.New(aclCacheSize)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
}
// Initialize the RPC layer // Initialize the RPC layer
if err := s.setupRPC(tlsConfig); err != nil { if err := s.setupRPC(tlsConfig); err != nil {
s.Shutdown() s.Shutdown()

View File

@ -3,8 +3,10 @@ package structs
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/ugorji/go/codec"
"time" "time"
"github.com/hashicorp/consul/acl"
"github.com/ugorji/go/codec"
) )
var ( var (
@ -469,6 +471,12 @@ type IndexedACLs struct {
QueryMeta QueryMeta
} }
type ACLPolicy struct {
Policy *acl.Policy
TTL time.Duration
QueryMeta
}
// msgpackHandle is a shared handle for encoding/decoding of structs // msgpackHandle is a shared handle for encoding/decoding of structs
var msgpackHandle = &codec.MsgpackHandle{} var msgpackHandle = &codec.MsgpackHandle{}