From 535d6b21b4fa0c40c0b169935cafb775d8452bbc Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 12 Dec 2016 21:59:22 -0800 Subject: [PATCH] Adds complete ACL coverage for /v1/session endpoints. --- command/agent/session_endpoint.go | 2 + consul/acl.go | 27 +++ consul/acl_test.go | 33 +++ consul/session_endpoint.go | 64 +++++- consul/session_endpoint_test.go | 357 +++++++++++++++++++++++++++++- 5 files changed, 468 insertions(+), 15 deletions(-) diff --git a/command/agent/session_endpoint.go b/command/agent/session_endpoint.go index 4049cf7d6..92ed9c6b1 100644 --- a/command/agent/session_endpoint.go +++ b/command/agent/session_endpoint.go @@ -46,6 +46,7 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request) }, } s.parseDC(req, &args.Datacenter) + s.parseToken(req, &args.Token) // Handle optional request body if req.ContentLength > 0 { @@ -117,6 +118,7 @@ func (s *HTTPServer) SessionDestroy(resp http.ResponseWriter, req *http.Request) Op: structs.SessionDestroy, } s.parseDC(req, &args.Datacenter) + s.parseToken(req, &args.Token) // Pull out the session id args.Session.ID = strings.TrimPrefix(req.URL.Path, "/v1/session/destroy/") diff --git a/consul/acl.go b/consul/acl.go index 7def53c3a..36135d9ac 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -344,6 +344,15 @@ func (f *aclFilter) allowService(service string) bool { return f.acl.ServiceRead(service) } +// allowSession is used to determine if a session for a node is accessible for +// an ACL. +func (f *aclFilter) allowSession(node string) bool { + if !f.enforceVersion8 { + return true + } + return f.acl.SessionRead(node) +} + // filterHealthChecks is used to filter a set of health checks down based on // the configured ACL rules for a token. func (f *aclFilter) filterHealthChecks(checks *structs.HealthChecks) { @@ -422,6 +431,21 @@ func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) { *nodes = csn } +// filterSessions is used to filter a set of sessions based on ACLs. +func (f *aclFilter) filterSessions(sessions *structs.Sessions) { + s := *sessions + for i := 0; i < len(s); i++ { + session := s[i] + if f.allowSession(session.Node) { + continue + } + f.logger.Printf("[DEBUG] consul: dropping session %q from result due to ACLs", session.ID) + s = append(s[:i], s[i+1:]...) + i-- + } + *sessions = s +} + // filterCoordinates is used to filter nodes in a coordinate dump based on ACL // rules. func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) { @@ -598,6 +622,9 @@ func (s *Server) filterACL(token string, subj interface{}) error { case *structs.IndexedServices: filt.filterServices(v.Services) + case *structs.IndexedSessions: + filt.filterSessions(&v.Sessions) + case *structs.IndexedPreparedQueries: filt.filterPreparedQueries(&v.Queries) diff --git a/consul/acl_test.go b/consul/acl_test.go index 52a458195..65ed6f7aa 100644 --- a/consul/acl_test.go +++ b/consul/acl_test.go @@ -1261,6 +1261,39 @@ func TestACL_filterCoordinates(t *testing.T) { } } +func TestACL_filterSessions(t *testing.T) { + // Create a session list. + sessions := structs.Sessions{ + &structs.Session{ + Node: "foo", + }, + &structs.Session{ + Node: "bar", + }, + } + + // Try permissive filtering. + filt := newAclFilter(acl.AllowAll(), nil, true) + filt.filterSessions(&sessions) + if len(sessions) != 2 { + t.Fatalf("bad: %#v", sessions) + } + + // Try restrictive filtering but with version 8 enforcement turned off. + filt = newAclFilter(acl.DenyAll(), nil, false) + filt.filterSessions(&sessions) + if len(sessions) != 2 { + t.Fatalf("bad: %#v", sessions) + } + + // Try restrictive filtering with version 8 enforcement turned on. + filt = newAclFilter(acl.DenyAll(), nil, true) + filt.filterSessions(&sessions) + if len(sessions) != 0 { + t.Fatalf("bad: %#v", sessions) + } +} + func TestACL_filterNodeDump(t *testing.T) { // Create a node dump. fill := func() structs.NodeDump { diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index c6ddbc75c..65272d422 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -30,6 +30,33 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { return fmt.Errorf("Must provide Node") } + // Fetch the ACL token, if any, and apply the policy. + acl, err := s.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && s.srv.config.ACLEnforceVersion8 { + switch args.Op { + case structs.SessionDestroy: + state := s.srv.fsm.State() + _, existing, err := state.SessionGet(args.Session.ID) + if err != nil { + return fmt.Errorf("Unknown session %q", args.Session.ID) + } + if !acl.SessionWrite(existing.Node) { + return permissionDeniedErr + } + + case structs.SessionCreate: + if !acl.SessionWrite(args.Session.Node) { + return permissionDeniedErr + } + + default: + return fmt.Errorf("Invalid session operation %q, args.Op") + } + } + // Ensure that the specified behavior is allowed switch args.Session.Behavior { case "": @@ -130,6 +157,9 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, } else { reply.Sessions = nil } + if err := s.srv.filterACL(args.Token, reply); err != nil { + return err + } return nil }) } @@ -154,6 +184,9 @@ func (s *Session) List(args *structs.DCSpecificRequest, } reply.Index, reply.Sessions = index, sessions + if err := s.srv.filterACL(args.Token, reply); err != nil { + return err + } return nil }) } @@ -178,6 +211,9 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, } reply.Index, reply.Sessions = index, sessions + if err := s.srv.filterACL(args.Token, reply); err != nil { + return err + } return nil }) } @@ -190,21 +226,35 @@ func (s *Session) Renew(args *structs.SessionSpecificRequest, } defer metrics.MeasureSince([]string{"consul", "session", "renew"}, time.Now()) - // Get the session, from local state + // Get the session, from local state. state := s.srv.fsm.State() index, session, err := state.SessionGet(args.Session) if err != nil { return err } - // Reset the session TTL timer reply.Index = index - if session != nil { - reply.Sessions = structs.Sessions{session} - if err := s.srv.resetSessionTimer(args.Session, session); err != nil { - s.srv.logger.Printf("[ERR] consul.session: Session renew failed: %v", err) - return err + if session == nil { + return nil + } + + // Fetch the ACL token, if any, and apply the policy. + acl, err := s.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && s.srv.config.ACLEnforceVersion8 { + if !acl.SessionWrite(session.Node) { + return permissionDeniedErr } } + + // Reset the session TTL timer. + reply.Sessions = structs.Sessions{session} + if err := s.srv.resetSessionTimer(args.Session, session); err != nil { + s.srv.logger.Printf("[ERR] consul.session: Session renew failed: %v", err) + return err + } + return nil } diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go index e5e0bed18..275a53aad 100644 --- a/consul/session_endpoint_test.go +++ b/consul/session_endpoint_test.go @@ -2,6 +2,7 @@ package consul import ( "os" + "strings" "testing" "time" @@ -11,7 +12,7 @@ import ( "github.com/hashicorp/net-rpc-msgpackrpc" ) -func TestSessionEndpoint_Apply(t *testing.T) { +func TestSession_Apply(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -70,7 +71,7 @@ func TestSessionEndpoint_Apply(t *testing.T) { } } -func TestSessionEndpoint_DeleteApply(t *testing.T) { +func TestSession_DeleteApply(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -133,7 +134,101 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) { } } -func TestSessionEndpoint_Get(t *testing.T) { +func TestSession_Apply_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Create the ACL. + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: ` +session "foo" { + policy = "write" +} +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + + var token string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + + // Just add a node. + s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + + // Try to create without a token, which will go through since version 8 + // enforcement isn't enabled. + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + Name: "my-session", + }, + } + var id1 string + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id1); err != nil { + t.Fatalf("err: %v", err) + } + + // Now turn on version 8 enforcement and try again, it should be denied. + var id2 string + s1.config.ACLEnforceVersion8 = true + err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id2) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Now set a token and try again. This should go through. + arg.Token = token + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id2); err != nil { + t.Fatalf("err: %v", err) + } + + // Do a delete on the first session with version 8 enforcement off and + // no token. This should go through. + var out string + s1.config.ACLEnforceVersion8 = false + arg.Op = structs.SessionDestroy + arg.Token = "" + arg.Session.ID = id1 + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Turn on version 8 enforcement and make sure the delete of the second + // session fails. + s1.config.ACLEnforceVersion8 = true + arg.Session.ID = id2 + err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Now set a token and try again. This should go through. + arg.Token = token + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestSession_Get(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -176,7 +271,7 @@ func TestSessionEndpoint_Get(t *testing.T) { } } -func TestSessionEndpoint_List(t *testing.T) { +func TestSession_List(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -227,7 +322,175 @@ func TestSessionEndpoint_List(t *testing.T) { } } -func TestSessionEndpoint_ApplyTimers(t *testing.T) { +func TestSession_Get_List_NodeSessions_ACLFilter(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Create the ACL. + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: ` +session "foo" { + policy = "read" +} +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + + var token string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + + // Create a node and a session. + s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var out string + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Perform all the read operations, which should go through since version + // 8 ACL enforcement isn't enabled. + getR := structs.SessionSpecificRequest{ + Datacenter: "dc1", + Session: out, + } + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 1 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + listR := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.List", &listR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 1 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + nodeR := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: "foo", + } + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &nodeR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 1 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + + // Now turn on version 8 enforcement and make sure everything is empty. + s1.config.ACLEnforceVersion8 = true + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 0 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + { + var sessions structs.IndexedSessions + + if err := msgpackrpc.CallWithCodec(codec, "Session.List", &listR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 0 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &nodeR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 0 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + + // Finally, supply the token and make sure the reads are allowed. + getR.Token = token + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 1 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + listR.Token = token + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.List", &listR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 1 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + nodeR.Token = token + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &nodeR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 1 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } + + // Try to get a session that doesn't exist to make sure that's handled + // correctly by the filter (it will get passed a nil slice). + getR.Session = "adf4238a-882b-9ddc-4a9d-5b6758e4159e" + { + var sessions structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + if len(sessions.Sessions) != 0 { + t.Fatalf("bad: %v", sessions.Sessions) + } + } +} + +func TestSession_ApplyTimers(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -268,7 +531,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) { } } -func TestSessionEndpoint_Renew(t *testing.T) { +func TestSession_Renew(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -428,7 +691,85 @@ func TestSessionEndpoint_Renew(t *testing.T) { } } -func TestSessionEndpoint_NodeSessions(t *testing.T) { +func TestSession_Renew_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Create the ACL. + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: ` +session "foo" { + policy = "write" +} +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + + var token string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + + // Just add a node. + s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + + // Create a session. The token won't matter here since we don't have + // version 8 ACL enforcement on yet. + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + Name: "my-session", + }, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // Renew without a token should go through without version 8 ACL + // enforcement. + renewR := structs.SessionSpecificRequest{ + Datacenter: "dc1", + Session: id, + } + var session structs.IndexedSessions + if err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session); err != nil { + t.Fatalf("err: %v", err) + } + + // Now turn on version 8 enforcement and the renew should be rejected. + s1.config.ACLEnforceVersion8 = true + err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Set the token and it should go through. + renewR.Token = token + if err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestSession_NodeSessions(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -486,7 +827,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) { } } -func TestSessionEndpoint_Apply_BadTTL(t *testing.T) { +func TestSession_Apply_BadTTL(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown()