Adds complete ACL coverage for /v1/session endpoints.

This commit is contained in:
James Phillips 2016-12-12 21:59:22 -08:00
parent bdafd33ce8
commit 535d6b21b4
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
5 changed files with 468 additions and 15 deletions

View File

@ -46,6 +46,7 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request)
},
}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Handle optional request body
if req.ContentLength > 0 {
@ -117,6 +118,7 @@ func (s *HTTPServer) SessionDestroy(resp http.ResponseWriter, req *http.Request)
Op: structs.SessionDestroy,
}
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Pull out the session id
args.Session.ID = strings.TrimPrefix(req.URL.Path, "/v1/session/destroy/")

View File

@ -344,6 +344,15 @@ func (f *aclFilter) allowService(service string) bool {
return f.acl.ServiceRead(service)
}
// allowSession is used to determine if a session for a node is accessible for
// an ACL.
func (f *aclFilter) allowSession(node string) bool {
if !f.enforceVersion8 {
return true
}
return f.acl.SessionRead(node)
}
// filterHealthChecks is used to filter a set of health checks down based on
// the configured ACL rules for a token.
func (f *aclFilter) filterHealthChecks(checks *structs.HealthChecks) {
@ -422,6 +431,21 @@ func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) {
*nodes = csn
}
// filterSessions is used to filter a set of sessions based on ACLs.
func (f *aclFilter) filterSessions(sessions *structs.Sessions) {
s := *sessions
for i := 0; i < len(s); i++ {
session := s[i]
if f.allowSession(session.Node) {
continue
}
f.logger.Printf("[DEBUG] consul: dropping session %q from result due to ACLs", session.ID)
s = append(s[:i], s[i+1:]...)
i--
}
*sessions = s
}
// filterCoordinates is used to filter nodes in a coordinate dump based on ACL
// rules.
func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) {
@ -598,6 +622,9 @@ func (s *Server) filterACL(token string, subj interface{}) error {
case *structs.IndexedServices:
filt.filterServices(v.Services)
case *structs.IndexedSessions:
filt.filterSessions(&v.Sessions)
case *structs.IndexedPreparedQueries:
filt.filterPreparedQueries(&v.Queries)

View File

@ -1261,6 +1261,39 @@ func TestACL_filterCoordinates(t *testing.T) {
}
}
func TestACL_filterSessions(t *testing.T) {
// Create a session list.
sessions := structs.Sessions{
&structs.Session{
Node: "foo",
},
&structs.Session{
Node: "bar",
},
}
// Try permissive filtering.
filt := newAclFilter(acl.AllowAll(), nil, true)
filt.filterSessions(&sessions)
if len(sessions) != 2 {
t.Fatalf("bad: %#v", sessions)
}
// Try restrictive filtering but with version 8 enforcement turned off.
filt = newAclFilter(acl.DenyAll(), nil, false)
filt.filterSessions(&sessions)
if len(sessions) != 2 {
t.Fatalf("bad: %#v", sessions)
}
// Try restrictive filtering with version 8 enforcement turned on.
filt = newAclFilter(acl.DenyAll(), nil, true)
filt.filterSessions(&sessions)
if len(sessions) != 0 {
t.Fatalf("bad: %#v", sessions)
}
}
func TestACL_filterNodeDump(t *testing.T) {
// Create a node dump.
fill := func() structs.NodeDump {

View File

@ -30,6 +30,33 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
return fmt.Errorf("Must provide Node")
}
// Fetch the ACL token, if any, and apply the policy.
acl, err := s.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && s.srv.config.ACLEnforceVersion8 {
switch args.Op {
case structs.SessionDestroy:
state := s.srv.fsm.State()
_, existing, err := state.SessionGet(args.Session.ID)
if err != nil {
return fmt.Errorf("Unknown session %q", args.Session.ID)
}
if !acl.SessionWrite(existing.Node) {
return permissionDeniedErr
}
case structs.SessionCreate:
if !acl.SessionWrite(args.Session.Node) {
return permissionDeniedErr
}
default:
return fmt.Errorf("Invalid session operation %q, args.Op")
}
}
// Ensure that the specified behavior is allowed
switch args.Session.Behavior {
case "":
@ -130,6 +157,9 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
} else {
reply.Sessions = nil
}
if err := s.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}
@ -154,6 +184,9 @@ func (s *Session) List(args *structs.DCSpecificRequest,
}
reply.Index, reply.Sessions = index, sessions
if err := s.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}
@ -178,6 +211,9 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
}
reply.Index, reply.Sessions = index, sessions
if err := s.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}
@ -190,21 +226,35 @@ func (s *Session) Renew(args *structs.SessionSpecificRequest,
}
defer metrics.MeasureSince([]string{"consul", "session", "renew"}, time.Now())
// Get the session, from local state
// Get the session, from local state.
state := s.srv.fsm.State()
index, session, err := state.SessionGet(args.Session)
if err != nil {
return err
}
// Reset the session TTL timer
reply.Index = index
if session != nil {
if session == nil {
return nil
}
// Fetch the ACL token, if any, and apply the policy.
acl, err := s.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && s.srv.config.ACLEnforceVersion8 {
if !acl.SessionWrite(session.Node) {
return permissionDeniedErr
}
}
// Reset the session TTL timer.
reply.Sessions = structs.Sessions{session}
if err := s.srv.resetSessionTimer(args.Session, session); err != nil {
s.srv.logger.Printf("[ERR] consul.session: Session renew failed: %v", err)
return err
}
}
return nil
}

View File

@ -2,6 +2,7 @@ package consul
import (
"os"
"strings"
"testing"
"time"
@ -11,7 +12,7 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestSessionEndpoint_Apply(t *testing.T) {
func TestSession_Apply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -70,7 +71,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
}
}
func TestSessionEndpoint_DeleteApply(t *testing.T) {
func TestSession_DeleteApply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -133,7 +134,101 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
}
}
func TestSessionEndpoint_Get(t *testing.T) {
func TestSession_Apply_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: `
session "foo" {
policy = "write"
}
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
// Just add a node.
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
// Try to create without a token, which will go through since version 8
// enforcement isn't enabled.
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
Name: "my-session",
},
}
var id1 string
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id1); err != nil {
t.Fatalf("err: %v", err)
}
// Now turn on version 8 enforcement and try again, it should be denied.
var id2 string
s1.config.ACLEnforceVersion8 = true
err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id2)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Now set a token and try again. This should go through.
arg.Token = token
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id2); err != nil {
t.Fatalf("err: %v", err)
}
// Do a delete on the first session with version 8 enforcement off and
// no token. This should go through.
var out string
s1.config.ACLEnforceVersion8 = false
arg.Op = structs.SessionDestroy
arg.Token = ""
arg.Session.ID = id1
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Turn on version 8 enforcement and make sure the delete of the second
// session fails.
s1.config.ACLEnforceVersion8 = true
arg.Session.ID = id2
err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Now set a token and try again. This should go through.
arg.Token = token
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestSession_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -176,7 +271,7 @@ func TestSessionEndpoint_Get(t *testing.T) {
}
}
func TestSessionEndpoint_List(t *testing.T) {
func TestSession_List(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -227,7 +322,175 @@ func TestSessionEndpoint_List(t *testing.T) {
}
}
func TestSessionEndpoint_ApplyTimers(t *testing.T) {
func TestSession_Get_List_NodeSessions_ACLFilter(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: `
session "foo" {
policy = "read"
}
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
// Create a node and a session.
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Perform all the read operations, which should go through since version
// 8 ACL enforcement isn't enabled.
getR := structs.SessionSpecificRequest{
Datacenter: "dc1",
Session: out,
}
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 1 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
listR := structs.DCSpecificRequest{
Datacenter: "dc1",
}
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &listR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 1 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
nodeR := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "foo",
}
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &nodeR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 1 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
// Now turn on version 8 enforcement and make sure everything is empty.
s1.config.ACLEnforceVersion8 = true
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 0 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &listR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 0 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &nodeR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 0 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
// Finally, supply the token and make sure the reads are allowed.
getR.Token = token
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 1 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
listR.Token = token
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &listR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 1 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
nodeR.Token = token
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &nodeR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 1 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
// Try to get a session that doesn't exist to make sure that's handled
// correctly by the filter (it will get passed a nil slice).
getR.Session = "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
{
var sessions structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions.Sessions) != 0 {
t.Fatalf("bad: %v", sessions.Sessions)
}
}
}
func TestSession_ApplyTimers(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -268,7 +531,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
}
}
func TestSessionEndpoint_Renew(t *testing.T) {
func TestSession_Renew(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -428,7 +691,85 @@ func TestSessionEndpoint_Renew(t *testing.T) {
}
}
func TestSessionEndpoint_NodeSessions(t *testing.T) {
func TestSession_Renew_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: `
session "foo" {
policy = "write"
}
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
// Just add a node.
s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
// Create a session. The token won't matter here since we don't have
// version 8 ACL enforcement on yet.
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
Name: "my-session",
},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// Renew without a token should go through without version 8 ACL
// enforcement.
renewR := structs.SessionSpecificRequest{
Datacenter: "dc1",
Session: id,
}
var session structs.IndexedSessions
if err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session); err != nil {
t.Fatalf("err: %v", err)
}
// Now turn on version 8 enforcement and the renew should be rejected.
s1.config.ACLEnforceVersion8 = true
err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Set the token and it should go through.
renewR.Token = token
if err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestSession_NodeSessions(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -486,7 +827,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
}
}
func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
func TestSession_Apply_BadTTL(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()