acl: add ACL roles to event stream topic and resolve policies. (#14923)

This changes adds ACL role creation and deletion to the event
stream. It is exposed as a single topic with two types; the filter
is primarily the role ID but also includes the role name.

While conducting this work it was also discovered that the events
stream has its own ACL resolution logic. This did not account for
ACL tokens which included role links, or tokens with expiry times.
ACL role links are now resolved to their policies and tokens are
checked for expiry correctly.
This commit is contained in:
James Rasell 2022-10-20 09:43:35 +02:00 committed by GitHub
parent d7b311ce55
commit 215b4e7e36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 725 additions and 39 deletions

11
.changelog/14923.txt Normal file
View File

@ -0,0 +1,11 @@
```release-note:bug
event stream: Resolve ACL roles within ACL tokens
```
```release-note:bug
event stream: Check ACL token expiry when resolving tokens
```
```release-note:improvement
event stream: Added ACL role topic with create and delete types
```

View File

@ -1366,8 +1366,8 @@ func (a *ACL) ListRoles(
}
// GetRolesByID is used to get a set of ACL Roles as defined by their ID. This
// endpoint is used by the replication process and uses a specific response in
// order to make that process easier.
// endpoint is used by the replication process and Nomad agent client token
// resolution.
func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACLRolesByIDResponse) error {
// This endpoint is only used by the replication process which is only
@ -1382,11 +1382,17 @@ func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACL
}
defer metrics.MeasureSince([]string{"nomad", "acl", "get_roles_id"}, time.Now())
// Check that the caller has a management token and that ACLs are enabled
// properly.
if acl, err := a.srv.ResolveToken(args.AuthToken); err != nil {
// For client typed tokens, allow them to query any roles associated with
// that token. This is used by Nomad agents in client mode which are
// resolving the roles to enforce.
token, err := a.requestACLToken(args.AuthToken)
if err != nil {
return err
} else if acl == nil || !acl.IsManagement() {
}
if token == nil {
return structs.ErrTokenNotFound
}
if token.Type != structs.ACLManagementToken && !token.HasRoles(args.ACLRoleIDs) {
return structs.ErrPermissionDenied
}

View File

@ -2228,6 +2228,7 @@ func TestACL_GetRolesByID(t *testing.T) {
// Try reading a role without setting a correct auth token.
aclRoleReq1 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{"nope"},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
},
@ -2278,6 +2279,48 @@ func TestACL_GetRolesByID(t *testing.T) {
require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[0].ID)
require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[1].ID)
// Create a client token which allows us to test client tokens looking up
// their own role assignments.
clientToken1 := &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
Name: "acl-endpoint-test-role",
Type: structs.ACLClientToken,
Roles: []*structs.ACLTokenRoleLink{{ID: aclRoles[0].ID}},
}
clientToken1.SetHash()
require.NoError(t, testServer.fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{clientToken1}))
// Use the client token in an attempt to look up an ACL role which is
// assigned to the token, and therefore should work.
aclRoleReq4 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{aclRoles[0].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: clientToken1.SecretID,
},
}
var aclRoleResp4 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq4, &aclRoleResp4)
require.NoError(t, err)
require.Len(t, aclRoleResp4.ACLRoles, 1)
require.Contains(t, aclRoleResp4.ACLRoles, aclRoles[0].ID)
// Use the client token in an attempt to look up an ACL role which is NOT
// assigned to the token which should fail.
aclRoleReq5 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{aclRoles[1].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: clientToken1.SecretID,
},
}
var aclRoleResp5 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5)
require.ErrorContains(t, err, "Permission denied")
// Now test a blocking query, where we wait for an update to the set which
// is triggered by a deletion.
type res struct {
@ -2287,7 +2330,7 @@ func TestACL_GetRolesByID(t *testing.T) {
resultCh := make(chan *res)
go func(resultCh chan *res) {
aclRoleReq5 := &structs.ACLRolesByIDRequest{
aclRoleReq6 := &structs.ACLRolesByIDRequest{
ACLRoleIDs: []string{aclRoles[0].ID, aclRoles[1].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
@ -2296,9 +2339,9 @@ func TestACL_GetRolesByID(t *testing.T) {
MaxQueryTime: 10 * time.Second,
},
}
var aclRoleResp5 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5)
resultCh <- &res{err: err, reply: &aclRoleResp5}
var aclRoleResp6 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq6, &aclRoleResp6)
resultCh <- &res{err: err, reply: &aclRoleResp6}
}(resultCh)
// Delete an ACL role from state which should return the blocking query.

View File

@ -28,6 +28,8 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
structs.ACLRolesDeleteByIDRequestType: structs.TypeACLRoleDeleted,
structs.ACLRolesUpsertRequestType: structs.TypeACLRoleUpserted,
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
@ -77,6 +79,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
ACLPolicy: before,
},
}, true
case TableACLRoles:
before, ok := change.Before.(*structs.ACLRole)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLRole,
Key: before.ID,
FilterKeys: []string{before.Name},
Payload: &structs.ACLRoleStreamEvent{
ACLRole: before,
},
}, true
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
@ -136,6 +151,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
ACLPolicy: after,
},
}, true
case TableACLRoles:
after, ok := change.After.(*structs.ACLRole)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicACLRole,
Key: after.ID,
FilterKeys: []string{after.Name},
Payload: &structs.ACLRoleStreamEvent{
ACLRole: after,
},
}, true
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {

View File

@ -1002,6 +1002,59 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {
require.Equal(t, service, eventPayload.Service)
}
func Test_eventsFromChanges_ACLRole(t *testing.T) {
ci.Parallel(t)
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer testState.StopEventBroker()
// Generate a test ACL role.
aclRole := mock.ACLRole()
// Upsert the role into state, skipping the checks perform to ensure the
// linked policies exist.
writeTxn := testState.db.WriteTxn(10)
updated, err := testState.upsertACLRoleTxn(10, writeTxn, aclRole, true)
require.True(t, updated)
require.NoError(t, err)
writeTxn.Txn.Commit()
// Pull the events from the stream.
upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLRolesUpsertRequestType}
receivedChange := eventsFromChanges(writeTxn, upsertChange)
// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedChange.Events, 1)
require.Equal(t, structs.TopicACLRole, receivedChange.Events[0].Topic)
require.Equal(t, aclRole.ID, receivedChange.Events[0].Key)
require.Equal(t, aclRole.Name, receivedChange.Events[0].FilterKeys[0])
require.Equal(t, structs.TypeACLRoleUpserted, receivedChange.Events[0].Type)
require.Equal(t, uint64(10), receivedChange.Events[0].Index)
eventPayload := receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent)
require.Equal(t, aclRole, eventPayload.ACLRole)
// Delete the previously upserted ACL role.
deleteTxn := testState.db.WriteTxn(20)
require.NoError(t, testState.deleteACLRoleByIDTxn(deleteTxn, aclRole.ID))
require.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLRoles, 20}))
deleteTxn.Txn.Commit()
// Pull the events from the stream.
deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLRolesDeleteByIDRequestType}
receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange)
// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedDeleteChange.Events, 1)
require.Equal(t, structs.TopicACLRole, receivedDeleteChange.Events[0].Topic)
require.Equal(t, aclRole.ID, receivedDeleteChange.Events[0].Key)
require.Equal(t, aclRole.Name, receivedDeleteChange.Events[0].FilterKeys[0])
require.Equal(t, structs.TypeACLRoleDeleted, receivedDeleteChange.Events[0].Type)
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)
eventPayload = receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent)
require.Equal(t, aclRole, eventPayload.ACLRole)
}
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

View File

@ -183,18 +183,8 @@ func (s *StateStore) DeleteACLRolesByID(
defer txn.Abort()
for _, roleID := range roleIDs {
existing, err := txn.First(TableACLRoles, indexID, roleID)
if err != nil {
return fmt.Errorf("ACL role lookup failed: %v", err)
}
if existing == nil {
return errors.New("ACL role not found")
}
// Delete the existing entry from the table.
if err := txn.Delete(TableACLRoles, existing); err != nil {
return fmt.Errorf("ACL role deletion failed: %v", err)
if err := s.deleteACLRoleByIDTxn(txn, roleID); err != nil {
return err
}
}
@ -206,6 +196,26 @@ func (s *StateStore) DeleteACLRolesByID(
return txn.Commit()
}
// deleteACLRoleByIDTxn deletes a single ACL role from the state store using the
// provided write transaction. It is the responsibility of the caller to update
// the index table.
func (s *StateStore) deleteACLRoleByIDTxn(txn *txn, roleID string) error {
existing, err := txn.First(TableACLRoles, indexID, roleID)
if err != nil {
return fmt.Errorf("ACL role lookup failed: %v", err)
}
if existing == nil {
return errors.New("ACL role not found")
}
// Delete the existing entry from the table.
if err := txn.Delete(TableACLRoles, existing); err != nil {
return fmt.Errorf("ACL role deletion failed: %v", err)
}
return nil
}
// GetACLRoles returns an iterator that contains all ACL roles stored within
// state.
func (s *StateStore) GetACLRoles(ws memdb.WatchSet) (memdb.ResultIterator, error) {

View File

@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-memdb"
@ -213,22 +214,22 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return !aclAllowsSubscription(aclObj, sub.req)
})
case *structs.ACLPolicyEvent:
// Re-evaluate each subscriptions permissions since a policy
// change may or may not affect the subscription
e.checkSubscriptionsAgainstPolicyChange()
case *structs.ACLPolicyEvent, *structs.ACLRoleStreamEvent:
// Re-evaluate each subscription permission since a policy or
// role change may alter the permissions of the token being
// used for the subscription.
e.checkSubscriptionsAgainstACLChange()
}
}
}
}
// checkSubscriptionsAgainstPolicyChange iterates over the brokers
// subscriptions and evaluates whether the token used for the subscription is
// still valid. If it is not valid it closes the subscriptions belonging to the
// token.
//
// A lock must be held to iterate over the map of subscriptions.
func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {
// checkSubscriptionsAgainstACLChange iterates over the brokers subscriptions
// and evaluates whether the token used for the subscription is still valid. A
// token may become invalid is the assigned policies or roles have been updated
// which removed the required permission. If the token is no long valid, the
// subscription is closed.
func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
e.mu.Lock()
defer e.mu.Unlock()
@ -257,14 +258,19 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {
}
}
func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) {
func aclObjFromSnapshotForTokenSecretID(
aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) {
aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
if err != nil {
return nil, err
}
if aclToken == nil {
return nil, errors.New("no token for secret ID")
return nil, structs.ErrTokenNotFound
}
if aclToken.IsExpired(time.Now().UTC()) {
return nil, structs.ErrTokenExpired
}
// Check if this is a management token
@ -272,7 +278,8 @@ func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *
return acl.ManagementACL, nil
}
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies))
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles))
for _, policyName := range aclToken.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
if err != nil || policy == nil {
@ -281,12 +288,34 @@ func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *
aclPolicies = append(aclPolicies, policy)
}
// Iterate all the token role links, so we can unpack these and identify
// the ACL policies.
for _, roleLink := range aclToken.Roles {
role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID)
if err != nil {
return nil, err
}
if role == nil {
continue
}
for _, policyLink := range role.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name)
if err != nil || policy == nil {
return nil, errors.New("error finding acl policy")
}
aclPolicies = append(aclPolicies, policy)
}
}
return structs.CompileACLObject(aclCache, aclPolicies)
}
type ACLTokenProvider interface {
ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error)
ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error)
GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error)
}
type ACLDelegate interface {

View File

@ -9,6 +9,8 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -174,17 +176,23 @@ type fakeACLTokenProvider struct {
policyErr error
token *structs.ACLToken
tokenErr error
role *structs.ACLRole
roleErr error
}
func (p *fakeACLTokenProvider) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) {
func (p *fakeACLTokenProvider) ACLTokenBySecretID(_ memdb.WatchSet, _ string) (*structs.ACLToken, error) {
return p.token, p.tokenErr
}
func (p *fakeACLTokenProvider) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) {
func (p *fakeACLTokenProvider) ACLPolicyByName(_ memdb.WatchSet, _ string) (*structs.ACLPolicy, error) {
return p.policy, p.policyErr
}
func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
func (p *fakeACLTokenProvider) GetACLRoleByID(_ memdb.WatchSet, _ string) (*structs.ACLRole, error) {
return p.role, p.roleErr
}
func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
@ -578,6 +586,403 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
}
}
func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// Generate a UUID to use in all tests for the token secret ID and the role
// ID.
tokenSecretID := uuid.Generate()
roleID := uuid.Generate()
cases := []struct {
name string
aclPolicy *structs.ACLPolicy
roleBeforePolicyLinks []*structs.ACLRolePolicyLink
roleAfterPolicyLinks []*structs.ACLRolePolicyLink
topics map[structs.Topic][]string
event structs.Event
policyEvent structs.Event
shouldUnsubscribe bool
initialSubErr bool
}{
{
name: "deployments access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "evaluations access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "allocations access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "nodes access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NodePolicy(acl.PolicyRead),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "deployment access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "evaluations access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "allocations access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "nodes access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NodePolicy(acl.PolicyRead),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Build our fake token provider containing the relevant state
// objects and add this to our new delegate. Keeping the token
// provider setup separate means we can easily update its state.
tokenProvider := &fakeACLTokenProvider{
policy: tc.aclPolicy,
token: &structs.ACLToken{
SecretID: tokenSecretID,
Roles: []*structs.ACLTokenRoleLink{{ID: roleID}},
},
role: &structs.ACLRole{
ID: uuid.Short(),
Policies: []*structs.ACLRolePolicyLink{
{Name: tc.aclPolicy.Name},
},
},
}
aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
ns := structs.DefaultNamespace
if tc.event.Namespace != "" {
ns = tc.event.Namespace
}
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}},
Namespace: ns,
Token: tokenSecretID,
})
if tc.initialSubErr {
require.Error(t, err)
require.Nil(t, sub)
return
}
require.NoError(t, err)
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// Overwrite the ACL role policy links with the updated version
// which is expected to cause a change in the subscription.
tokenProvider.role.Policies = tc.roleAfterPolicyLinks
// Publish ACL event triggering subscription re-evaluation
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldUnsubscribe {
for {
_, err = sub.Next(ctx)
if err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
if tc.shouldUnsubscribe {
require.Equal(t, ErrSubscriptionClosed, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
cases := []struct {
name string
inputToken *structs.ACLToken
shouldExpire bool
}{
{
name: "token does not expire",
inputToken: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()),
Type: structs.ACLManagementToken,
},
shouldExpire: false,
},
{
name: "token does expire",
inputToken: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()),
Type: structs.ACLManagementToken,
},
shouldExpire: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Build our fake token provider containing the relevant state
// objects and add this to our new delegate. Keeping the token
// provider setup separate means we can easily update its state.
tokenProvider := &fakeACLTokenProvider{token: tc.inputToken}
aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
fakeNodeEvent := structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
}
fakeTokenEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}),
}
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}},
Token: tc.inputToken.SecretID,
})
require.NoError(t, err)
require.NotNil(t, sub)
// Publish an event and check that there is a new item in the
// subscription queue.
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{fakeNodeEvent}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// If the test states the token should expire, set the expiration
// time to a previous time.
if tc.shouldExpire {
tokenProvider.token.ExpirationTime = pointer.Of(
time.Date(1987, time.April, 13, 8, 3, 0, 0, time.UTC),
)
}
// Publish some events to trigger re-evaluation of the subscription.
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{fakeTokenEvent}})
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{fakeNodeEvent}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldExpire {
for {
if _, err = sub.Next(ctx); err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
})
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"golang.org/x/crypto/blake2b"
@ -232,6 +233,24 @@ func (a *ACLToken) IsExpired(t time.Time) bool {
return a.ExpirationTime.Before(t) || t.IsZero()
}
// HasRoles checks if a given set of role IDs are assigned to the ACL token. It
// does not account for management tokens, therefore it is the responsibility
// of the caller to perform this check, if required.
func (a *ACLToken) HasRoles(roleIDs []string) bool {
// Generate a set of role IDs that the token is assigned.
roleSet := set.FromFunc(a.Roles, func(roleLink *ACLTokenRoleLink) string { return roleLink.ID })
// Iterate the role IDs within the request and check whether these are
// present within the token assignment.
for _, roleID := range roleIDs {
if !roleSet.Contains(roleID) {
return false
}
}
return true
}
// ACLRole is an abstraction for the ACL system which allows the grouping of
// ACL policies into a single object. ACL tokens can be created and linked to
// a role; the token then inherits all the permissions granted by the policies.

View File

@ -297,6 +297,75 @@ func TestACLToken_IsExpired(t *testing.T) {
}
}
func TestACLToken_HasRoles(t *testing.T) {
testCases := []struct {
name string
inputToken *ACLToken
inputRoleIDs []string
expectedOutput bool
}{
{
name: "client token request all subset",
inputToken: &ACLToken{
Type: ACLClientToken,
Roles: []*ACLTokenRoleLink{
{ID: "foo"},
{ID: "bar"},
{ID: "baz"},
},
},
inputRoleIDs: []string{"foo", "bar", "baz"},
expectedOutput: true,
},
{
name: "client token request partial subset",
inputToken: &ACLToken{
Type: ACLClientToken,
Roles: []*ACLTokenRoleLink{
{ID: "foo"},
{ID: "bar"},
{ID: "baz"},
},
},
inputRoleIDs: []string{"foo", "baz"},
expectedOutput: true,
},
{
name: "client token request one subset",
inputToken: &ACLToken{
Type: ACLClientToken,
Roles: []*ACLTokenRoleLink{
{ID: "foo"},
{ID: "bar"},
{ID: "baz"},
},
},
inputRoleIDs: []string{"baz"},
expectedOutput: true,
},
{
name: "client token request no subset",
inputToken: &ACLToken{
Type: ACLClientToken,
Roles: []*ACLTokenRoleLink{
{ID: "foo"},
{ID: "bar"},
{ID: "baz"},
},
},
inputRoleIDs: []string{"new"},
expectedOutput: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualOutput := tc.inputToken.HasRoles(tc.inputRoleIDs)
require.Equal(t, tc.expectedOutput, actualOutput)
})
}
}
func TestACLRole_SetHash(t *testing.T) {
testCases := []struct {
name string

View File

@ -23,6 +23,7 @@ const (
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicACLRole Topic = "ACLRole"
TopicService Topic = "Service"
TopicAll Topic = "*"
@ -46,6 +47,8 @@ const (
TypeACLTokenUpserted = "ACLTokenUpserted"
TypeACLPolicyDeleted = "ACLPolicyDeleted"
TypeACLPolicyUpserted = "ACLPolicyUpserted"
TypeACLRoleDeleted = "ACLRoleDeleted"
TypeACLRoleUpserted = "ACLRoleUpserted"
TypeServiceRegistration = "ServiceRegistration"
TypeServiceDeregistration = "ServiceDeregistration"
)
@ -151,3 +154,9 @@ func (a *ACLTokenEvent) SecretID() string {
type ACLPolicyEvent struct {
ACLPolicy *ACLPolicy
}
// ACLRoleStreamEvent holds a newly updated or delete ACL role to be used as an
// event within the event stream.
type ACLRoleStreamEvent struct {
ACLRole *ACLRole
}

View File

@ -33,6 +33,7 @@ by default, requiring a management token.
| `*` | `management` |
| `ACLToken` | `management` |
| `ACLPolicy` | `management` |
| `ACLRole` | `management` |
| `Job` | `namespace:read-job` |
| `Allocation` | `namespace:read-job` |
| `Deployment` | `namespace:read-job` |
@ -67,6 +68,7 @@ by default, requiring a management token.
| ---------- | ------------------------------- |
| ACLToken | ACLToken |
| ACLPolicy | ACLPolicy |
| ACLRoles | ACLRole |
| Allocation | Allocation (no job information) |
| Job | Job |
| Evaluation | Evaluation |
@ -83,6 +85,8 @@ by default, requiring a management token.
| ACLTokenDeleted |
| ACLPolicyUpserted |
| ACLPolicyDeleted |
| ACLRoleUpserted |
| ACLRoleDeleted |
| AllocationCreated |
| AllocationUpdated |
| AllocationUpdateDesiredStatus |