From bd124a8da3543fe970943d0b1f76a741bdccbdaf Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 8 Aug 2014 15:32:43 -0700 Subject: [PATCH] consul: Pulling in ACLs --- consul/acl.go | 79 +++++++++++++++++++++++++++++++++++++ consul/acl_endpoint.go | 20 ++++++++++ consul/acl_endpoint_test.go | 45 ++++++++++++++++++++- consul/server.go | 35 ++++++++++++++++ consul/structs/structs.go | 10 ++++- 5 files changed, 186 insertions(+), 3 deletions(-) create mode 100644 consul/acl.go diff --git a/consul/acl.go b/consul/acl.go new file mode 100644 index 000000000..29fe4c5f6 --- /dev/null +++ b/consul/acl.go @@ -0,0 +1,79 @@ +package consul + +import ( + "fmt" + "time" + + "github.com/hashicorp/consul/acl" +) + +// aclCacheEntry is used to cache non-authoritative ACL's +// If non-authoritative, then we must respect a TTL +type aclCacheEntry struct { + ACL acl.ACL + TTL time.Duration + Expires time.Time +} + +// aclFault is used to fault in the rules for an ACL if we take a miss +func (s *Server) aclFault(id string) (string, error) { + state := s.fsm.State() + _, acl, err := state.ACLGet(id) + if err != nil { + return "", err + } + if acl == nil { + return "", fmt.Errorf("ACL not found: %s", id) + } + return acl.Rules, nil +} + +// resolveToken is used to resolve an ACL is any is appropriate +func (s *Server) resolveToken(id string) (acl.ACL, error) { + // Check if there is no ACL datacenter (ACL's disabled) + authDC := s.config.ACLDatacenter + if authDC == "" { + return nil, nil + } + + // Check if we are the ACL datacenter and the leader, use the + // authoritative cache + if s.config.Datacenter == authDC && s.IsLeader() { + return s.aclAuthCache.GetACL(id) + } + + // Use our non-authoritative cache + return s.lookupACL(id) +} + +// lookupACL is used when we are non-authoritative, and need +// to resolve an ACL +func (s *Server) lookupACL(id string) (acl.ACL, error) { + // Check the cache for the ACL + var cached *aclCacheEntry + raw, ok := s.aclCache.Get(id) + if ok { + cached = raw.(*aclCacheEntry) + } + + // Check for live cache + if cached != nil && time.Now().Before(cached.Expires) { + return cached.ACL, nil + } + + // Attempt to refresh the policy + // TODO: GetPolicy... + + // Unable to refresh, apply the down policy + switch s.config.ACLDownPolicy { + case "allow": + return acl.AllowAll(), nil + case "extend-cache": + if cached != nil { + return cached.ACL, nil + } + fallthrough + default: + return acl.DenyAll(), nil + } +} diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 89c7ee6d6..908d2fcb5 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -81,6 +81,26 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, }) } +// GetPolicy is used to retrieve a compiled policy object with a TTL. Does not +// support a blocking query. +func (a *ACL) GetPolicy(args *structs.ACLSpecificRequest, reply *structs.ACLPolicy) error { + if done, err := a.srv.forward("ACL.GetPolicy", args, args, reply); done { + return err + } + + // Get the policy via the cache + policy, err := a.srv.aclAuthCache.GetACLPolicy(args.ACL) + if err != nil { + return err + } + + // Setup the response + reply.Policy = policy + reply.TTL = a.srv.config.ACLTTL + a.srv.setQueryMeta(&reply.QueryMeta) + return nil +} + // List is used to list all the ACLs func (a *ACL) List(args *structs.DCSpecificRequest, reply *structs.IndexedACLs) error { diff --git a/consul/acl_endpoint_test.go b/consul/acl_endpoint_test.go index 7f0878797..032df5520 100644 --- a/consul/acl_endpoint_test.go +++ b/consul/acl_endpoint_test.go @@ -1,10 +1,12 @@ package consul import ( - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/testutil" "os" "testing" + "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" ) func TestACLEndpoint_Apply(t *testing.T) { @@ -106,6 +108,45 @@ func TestACLEndpoint_Get(t *testing.T) { } } +func TestACLEndpoint_GetPolicy(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + }, + } + var out string + if err := client.Call("ACL.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + getR := structs.ACLSpecificRequest{ + Datacenter: "dc1", + ACL: out, + } + var acls structs.ACLPolicy + if err := client.Call("ACL.GetPolicy", &getR, &acls); err != nil { + t.Fatalf("err: %v", err) + } + + if acls.Policy == nil { + t.Fatalf("Bad: %v", acls) + } + if acls.TTL != 30*time.Second { + t.Fatalf("bad: %v", acls) + } +} + func TestACLEndpoint_List(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/server.go b/consul/server.go index b07522e15..5e0465213 100644 --- a/consul/server.go +++ b/consul/server.go @@ -15,6 +15,8 @@ import ( "sync" "time" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/golang-lru" "github.com/hashicorp/raft" "github.com/hashicorp/raft-mdb" "github.com/hashicorp/serf/serf" @@ -43,11 +45,21 @@ const ( // serverMaxStreams controsl how many idle streams we keep // open to a server serverMaxStreams = 64 + + // Maximum number of cached ACL entries + aclCacheSize = 256 ) // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { + // aclAuthCache is the authoritative ACL cache + aclAuthCache *acl.Cache + + // aclCache is a non-authoritative ACL cache + aclCache *lru.Cache + + // Consul configuration config *Config // Connection pool to other consul servers @@ -181,6 +193,29 @@ func NewServer(config *Config) (*Server, error) { shutdownCh: make(chan struct{}), } + // Determine the ACL root policy + var aclRoot acl.ACL + switch config.ACLDefaultPolicy { + case "allow": + aclRoot = acl.AllowAll() + case "deny": + aclRoot = acl.DenyAll() + } + + // Initialize the authoritative ACL cache + s.aclAuthCache, err = acl.NewCache(aclCacheSize, aclRoot, s.aclFault) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to create ACL cache: %v", err) + } + + // Initialize the non-authoritative ACL cache + s.aclCache, err = lru.New(aclCacheSize) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to create ACL cache: %v", err) + } + // Initialize the RPC layer if err := s.setupRPC(tlsConfig); err != nil { s.Shutdown() diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 27cffd0b2..3c9b78c9a 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -3,8 +3,10 @@ package structs import ( "bytes" "fmt" - "github.com/ugorji/go/codec" "time" + + "github.com/hashicorp/consul/acl" + "github.com/ugorji/go/codec" ) var ( @@ -469,6 +471,12 @@ type IndexedACLs struct { QueryMeta } +type ACLPolicy struct { + Policy *acl.Policy + TTL time.Duration + QueryMeta +} + // msgpackHandle is a shared handle for encoding/decoding of structs var msgpackHandle = &codec.MsgpackHandle{}