state: Make handleACLUpdate async once again
So that we keep as much as possible out of the FSM commit hot path.
This commit is contained in:
parent
a92dab724d
commit
6e87e83d77
|
@ -7,42 +7,52 @@ import (
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ACLEventsFromChanges returns all the ACL token, policy or role events that
|
// aclChangeUnsubscribeEvent creates and returns stream.UnsubscribeEvents that
|
||||||
// should be emitted given a set of changes to the state store.
|
// are used to unsubscribe any subscriptions which match the tokens from the events.
|
||||||
// TODO: Add OpDelete/OpUpdate to the event or payload?
|
//
|
||||||
func aclEventsFromChanges(_ db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
|
// These are special events that will never be returned to a subscriber.
|
||||||
var events []stream.Event
|
func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Event, error) {
|
||||||
|
var secretIDs []string
|
||||||
|
|
||||||
// TODO: mapping of table->topic?
|
|
||||||
for _, change := range changes.Changes {
|
for _, change := range changes.Changes {
|
||||||
switch change.Table {
|
switch change.Table {
|
||||||
case "acl-tokens":
|
case "acl-tokens":
|
||||||
token := changeObject(change).(*structs.ACLToken)
|
token := changeObject(change).(*structs.ACLToken)
|
||||||
e := stream.Event{
|
secretIDs = append(secretIDs, token.SecretID)
|
||||||
Topic: stream.Topic_ACLTokens,
|
|
||||||
Index: changes.Index,
|
|
||||||
Payload: token,
|
|
||||||
}
|
|
||||||
events = append(events, e)
|
|
||||||
case "acl-policies":
|
|
||||||
policy := changeObject(change).(*structs.ACLPolicy)
|
|
||||||
e := stream.Event{
|
|
||||||
Topic: stream.Topic_ACLPolicies,
|
|
||||||
Index: changes.Index,
|
|
||||||
Payload: policy,
|
|
||||||
}
|
|
||||||
events = append(events, e)
|
|
||||||
case "acl-roles":
|
case "acl-roles":
|
||||||
role := changeObject(change).(*structs.ACLRole)
|
role := changeObject(change).(*structs.ACLRole)
|
||||||
e := stream.Event{
|
tokens, err := aclTokenListByRole(tx, role.ID, &role.EnterpriseMeta)
|
||||||
Topic: stream.Topic_ACLRoles,
|
if err != nil {
|
||||||
Index: changes.Index,
|
return nil, err
|
||||||
Payload: role,
|
}
|
||||||
|
secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens)
|
||||||
|
|
||||||
|
case "acl-policies":
|
||||||
|
policy := changeObject(change).(*structs.ACLPolicy)
|
||||||
|
tokens, err := aclTokenListByPolicy(tx, policy.ID, &policy.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens)
|
||||||
|
|
||||||
|
roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for role := roles.Next(); role != nil; role = roles.Next() {
|
||||||
|
role := role.(*structs.ACLRole)
|
||||||
|
|
||||||
|
tokens, err := aclTokenListByRole(tx, role.ID, &policy.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
secretIDs = appendSecretIDsFromTokenIterator(secretIDs, tokens)
|
||||||
}
|
}
|
||||||
events = append(events, e)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return events, nil
|
// TODO: should we remove duplicate IDs here, or rely on sub.Close() being idempotent
|
||||||
|
return []stream.Event{stream.NewUnsubscribeEvent(secretIDs)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// changeObject returns the object before it was deleted if the change was a delete,
|
// changeObject returns the object before it was deleted if the change was a delete,
|
||||||
|
@ -53,3 +63,11 @@ func changeObject(change memdb.Change) interface{} {
|
||||||
}
|
}
|
||||||
return change.After
|
return change.After
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func appendSecretIDsFromTokenIterator(seq []string, tokens memdb.ResultIterator) []string {
|
||||||
|
for token := tokens.Next(); token != nil; token = tokens.Next() {
|
||||||
|
token := token.(*structs.ACLToken)
|
||||||
|
seq = append(seq, token.SecretID)
|
||||||
|
}
|
||||||
|
return seq
|
||||||
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestACLEventsFromChanges(t *testing.T) {
|
func TestACLChangeUnsubscribeEvent(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
Name string
|
Name string
|
||||||
Setup func(s *Store, tx *txn) error
|
Setup func(s *Store, tx *txn) error
|
||||||
|
@ -23,7 +23,7 @@ func TestACLEventsFromChanges(t *testing.T) {
|
||||||
Mutate: func(s *Store, tx *txn) error {
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false)
|
return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false)
|
||||||
},
|
},
|
||||||
expected: newACLTokenEvent(100, 1),
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "token update",
|
Name: "token update",
|
||||||
|
@ -37,7 +37,7 @@ func TestACLEventsFromChanges(t *testing.T) {
|
||||||
token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}}
|
token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}}
|
||||||
return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false)
|
return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false)
|
||||||
},
|
},
|
||||||
expected: newACLTokenEvent(100, 1, structs.ACLTokenPolicyLink{ID: "33333333-1111-1111-1111-111111111111"}),
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "token delete",
|
Name: "token delete",
|
||||||
|
@ -48,58 +48,42 @@ func TestACLEventsFromChanges(t *testing.T) {
|
||||||
token := newACLToken(1)
|
token := newACLToken(1)
|
||||||
return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil)
|
return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil)
|
||||||
},
|
},
|
||||||
expected: newACLTokenEvent(100, 1),
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "policy create",
|
Name: "policy create",
|
||||||
Mutate: func(s *Store, tx *txn) error {
|
Mutate: newACLPolicyWithSingleToken,
|
||||||
return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1))
|
// two identical tokens, because Mutate has two changes
|
||||||
},
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)),
|
||||||
expected: newACLPolicyEvent(100, 1),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "policy update",
|
Name: "policy update",
|
||||||
Setup: func(s *Store, tx *txn) error {
|
Setup: newACLPolicyWithSingleToken,
|
||||||
return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1))
|
|
||||||
},
|
|
||||||
Mutate: func(s *Store, tx *txn) error {
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
policy := newACLPolicy(1)
|
policy := newACLPolicy(1)
|
||||||
policy.Rules = `operator = "write"`
|
policy.Rules = `operator = "write"`
|
||||||
return s.aclPolicySetTxn(tx, tx.Index, policy)
|
return s.aclPolicySetTxn(tx, tx.Index, policy)
|
||||||
},
|
},
|
||||||
expected: stream.Event{
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||||
Topic: stream.Topic_ACLPolicies,
|
|
||||||
Index: 100,
|
|
||||||
Payload: &structs.ACLPolicy{
|
|
||||||
ID: "22222222-1111-1111-1111-111111111111",
|
|
||||||
Name: "test_policy_1",
|
|
||||||
Rules: `operator = "write"`,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "policy delete",
|
Name: "policy delete",
|
||||||
Setup: func(s *Store, tx *txn) error {
|
Setup: newACLPolicyWithSingleToken,
|
||||||
return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1))
|
|
||||||
},
|
|
||||||
Mutate: func(s *Store, tx *txn) error {
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
policy := newACLPolicy(1)
|
policy := newACLPolicy(1)
|
||||||
return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil)
|
return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil)
|
||||||
},
|
},
|
||||||
expected: newACLPolicyEvent(100, 1),
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "role create",
|
Name: "role create",
|
||||||
Mutate: func(s *Store, tx *txn) error {
|
Mutate: newACLRoleWithSingleToken,
|
||||||
return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true)
|
// Two tokens with the same ID, because there are two changes in Mutate
|
||||||
},
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)),
|
||||||
expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1)),
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "role update",
|
Name: "role update",
|
||||||
Setup: func(s *Store, tx *txn) error {
|
Setup: newACLRoleWithSingleToken,
|
||||||
return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true)
|
|
||||||
},
|
|
||||||
Mutate: func(s *Store, tx *txn) error {
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
role := newACLRole(1, newACLRolePolicyLink(1))
|
role := newACLRole(1, newACLRolePolicyLink(1))
|
||||||
policy2 := newACLPolicy(2)
|
policy2 := newACLPolicy(2)
|
||||||
|
@ -109,18 +93,16 @@ func TestACLEventsFromChanges(t *testing.T) {
|
||||||
})
|
})
|
||||||
return s.aclRoleSetTxn(tx, tx.Index, role, true)
|
return s.aclRoleSetTxn(tx, tx.Index, role, true)
|
||||||
},
|
},
|
||||||
expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1), newACLRolePolicyLink(2)),
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "role delete",
|
Name: "role delete",
|
||||||
Setup: func(s *Store, tx *txn) error {
|
Setup: newACLRoleWithSingleToken,
|
||||||
return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true)
|
|
||||||
},
|
|
||||||
Mutate: func(s *Store, tx *txn) error {
|
Mutate: func(s *Store, tx *txn) error {
|
||||||
role := newACLRole(1, newACLRolePolicyLink(1))
|
role := newACLRole(1, newACLRolePolicyLink(1))
|
||||||
return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil)
|
return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil)
|
||||||
},
|
},
|
||||||
expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1)),
|
expected: stream.NewUnsubscribeEvent(newSecretIDs(1)),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,61 +128,43 @@ func TestACLEventsFromChanges(t *testing.T) {
|
||||||
|
|
||||||
// Note we call the func under test directly rather than publishChanges so
|
// Note we call the func under test directly rather than publishChanges so
|
||||||
// we can test this in isolation.
|
// we can test this in isolation.
|
||||||
events, err := aclEventsFromChanges(tx, db.Changes{Index: 100, Changes: tx.Changes()})
|
events, err := aclChangeUnsubscribeEvent(tx, db.Changes{Index: 100, Changes: tx.Changes()})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Len(t, events, 1)
|
require.Len(t, events, 1)
|
||||||
actual := events[0]
|
actual := events[0]
|
||||||
// ignore modified and created index because we don't set them in our expected values
|
|
||||||
// TODO: gotest.tools/assert would make this easier
|
|
||||||
normalizePayload(&actual)
|
|
||||||
require.Equal(t, tc.expected, actual)
|
require.Equal(t, tc.expected, actual)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func normalizePayload(s *stream.Event) {
|
func newACLRoleWithSingleToken(s *Store, tx *txn) error {
|
||||||
switch s := s.Payload.(type) {
|
role := newACLRole(1, newACLRolePolicyLink(1))
|
||||||
case *structs.ACLToken:
|
if err := s.aclRoleSetTxn(tx, tx.Index, role, true); err != nil {
|
||||||
s.ModifyIndex = 0
|
return err
|
||||||
s.CreateIndex = 0
|
|
||||||
s.Hash = nil
|
|
||||||
case *structs.ACLPolicy:
|
|
||||||
s.ModifyIndex = 0
|
|
||||||
s.CreateIndex = 0
|
|
||||||
case *structs.ACLRole:
|
|
||||||
s.ModifyIndex = 0
|
|
||||||
s.CreateIndex = 0
|
|
||||||
}
|
}
|
||||||
|
token := newACLToken(1)
|
||||||
|
token.Roles = append(token.Roles, structs.ACLTokenRoleLink{ID: role.ID})
|
||||||
|
return s.aclTokenSetTxn(tx, tx.Index, token, false, false, false, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newACLTokenEvent(idx uint64, n int, policies ...structs.ACLTokenPolicyLink) stream.Event {
|
func newACLPolicyWithSingleToken(s *Store, tx *txn) error {
|
||||||
uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(n))
|
policy := newACLPolicy(1)
|
||||||
return stream.Event{
|
if err := s.aclPolicySetTxn(tx, tx.Index, policy); err != nil {
|
||||||
Topic: stream.Topic_ACLTokens,
|
return err
|
||||||
Index: idx,
|
|
||||||
Payload: &structs.ACLToken{
|
|
||||||
AccessorID: uuid,
|
|
||||||
SecretID: uuid,
|
|
||||||
Policies: policies,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
token := newACLToken(1)
|
||||||
|
token.Policies = append(token.Policies, structs.ACLTokenPolicyLink{ID: policy.ID})
|
||||||
|
return s.aclTokenSetTxn(tx, tx.Index, token, false, false, false, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newACLPolicyEvent(idx uint64, n int) stream.Event {
|
func newSecretIDs(ids ...int) []string {
|
||||||
return stream.Event{
|
result := make([]string, 0, len(ids))
|
||||||
Topic: stream.Topic_ACLPolicies,
|
for _, id := range ids {
|
||||||
Index: idx,
|
uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(id))
|
||||||
Payload: newACLPolicy(n),
|
result = append(result, uuid)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newACLRoleEvent(idx uint64, n int, policies ...structs.ACLRolePolicyLink) stream.Event {
|
|
||||||
return stream.Event{
|
|
||||||
Topic: stream.Topic_ACLRoles,
|
|
||||||
Index: idx,
|
|
||||||
Payload: newACLRole(n, policies...),
|
|
||||||
}
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func newACLToken(n int) *structs.ACLToken {
|
func newACLToken(n int) *structs.ACLToken {
|
||||||
|
|
|
@ -8,8 +8,6 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state/db"
|
"github.com/hashicorp/consul/agent/consul/state/db"
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/go-memdb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// EventPublisher receives changes events from Publish, and sends them to all
|
// EventPublisher receives changes events from Publish, and sends them to all
|
||||||
|
@ -47,7 +45,7 @@ type EventPublisher struct {
|
||||||
// publishCh is used to send messages from an active txn to a goroutine which
|
// publishCh is used to send messages from an active txn to a goroutine which
|
||||||
// publishes events, so that publishing can happen asynchronously from
|
// publishes events, so that publishing can happen asynchronously from
|
||||||
// the Commit call in the FSM hot path.
|
// the Commit call in the FSM hot path.
|
||||||
publishCh chan commitUpdate
|
publishCh chan changeEvents
|
||||||
|
|
||||||
handlers map[stream.Topic]TopicHandler
|
handlers map[stream.Topic]TopicHandler
|
||||||
}
|
}
|
||||||
|
@ -65,8 +63,7 @@ type subscriptions struct {
|
||||||
byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
|
byToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: rename
|
type changeEvents struct {
|
||||||
type commitUpdate struct {
|
|
||||||
events []stream.Event
|
events []stream.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +87,7 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl
|
||||||
snapCacheTTL: snapCacheTTL,
|
snapCacheTTL: snapCacheTTL,
|
||||||
topicBuffers: make(map[stream.Topic]*stream.EventBuffer),
|
topicBuffers: make(map[stream.Topic]*stream.EventBuffer),
|
||||||
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
|
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
|
||||||
publishCh: make(chan commitUpdate, 64),
|
publishCh: make(chan changeEvents, 64),
|
||||||
subscriptions: &subscriptions{
|
subscriptions: &subscriptions{
|
||||||
byToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
|
byToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
|
||||||
},
|
},
|
||||||
|
@ -106,6 +103,8 @@ func NewEventPublisher(ctx context.Context, handlers map[stream.Topic]TopicHandl
|
||||||
// used from a goroutine. The caller should never use the tx once it has been
|
// used from a goroutine. The caller should never use the tx once it has been
|
||||||
// passed to PublishChanged.
|
// passed to PublishChanged.
|
||||||
func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error {
|
func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error {
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
var events []stream.Event
|
var events []stream.Event
|
||||||
for topic, handler := range e.handlers {
|
for topic, handler := range e.handlers {
|
||||||
if handler.ProcessChanges != nil {
|
if handler.ProcessChanges != nil {
|
||||||
|
@ -117,29 +116,7 @@ func (e *EventPublisher) PublishChanges(tx db.ReadTxn, changes db.Changes) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: call tx.Abort when this is done with tx.
|
e.publishCh <- changeEvents{events: events}
|
||||||
for _, event := range events {
|
|
||||||
// If the event is an ACL update, treat it as a special case. Currently
|
|
||||||
// ACL update events are only used internally to recognize when a subscriber
|
|
||||||
// should reload its subscription.
|
|
||||||
if event.Topic == stream.Topic_ACLTokens ||
|
|
||||||
event.Topic == stream.Topic_ACLPolicies ||
|
|
||||||
event.Topic == stream.Topic_ACLRoles {
|
|
||||||
|
|
||||||
if err := e.subscriptions.handleACLUpdate(tx, event); err != nil {
|
|
||||||
// This seems pretty drastic? What would be better. It's not super safe
|
|
||||||
// to continue since we might have missed some ACL update and so leak
|
|
||||||
// data to unauthorized clients but crashing whole server also seems
|
|
||||||
// bad. I wonder if we could send a "reset" to all subscribers instead
|
|
||||||
// and effectively re-start all subscriptions to be on the safe side
|
|
||||||
// without just crashing?
|
|
||||||
// TODO(banks): reset all instead of panic?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
e.publishCh <- commitUpdate{events: events}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,9 +135,18 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
|
||||||
|
|
||||||
// sendEvents sends the given events to any applicable topic listeners, as well
|
// sendEvents sends the given events to any applicable topic listeners, as well
|
||||||
// as any ACL update events to cause affected listeners to reset their stream.
|
// as any ACL update events to cause affected listeners to reset their stream.
|
||||||
func (e *EventPublisher) sendEvents(update commitUpdate) {
|
func (e *EventPublisher) sendEvents(update changeEvents) {
|
||||||
|
for _, event := range update.events {
|
||||||
|
if unsubEvent, ok := event.Payload.(stream.UnsubscribePayload); ok {
|
||||||
|
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
eventsByTopic := make(map[stream.Topic][]stream.Event)
|
eventsByTopic := make(map[stream.Topic][]stream.Event)
|
||||||
for _, event := range update.events {
|
for _, event := range update.events {
|
||||||
|
if event.Topic == stream.TopicInternal {
|
||||||
|
continue
|
||||||
|
}
|
||||||
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
|
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,65 +170,6 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleACLUpdate handles an ACL token/policy/role update.
|
|
||||||
func (s *subscriptions) handleACLUpdate(tx db.ReadTxn, event stream.Event) error {
|
|
||||||
s.lock.RLock()
|
|
||||||
defer s.lock.RUnlock()
|
|
||||||
|
|
||||||
switch event.Topic {
|
|
||||||
case stream.Topic_ACLTokens:
|
|
||||||
token := event.Payload.(*structs.ACLToken)
|
|
||||||
for _, sub := range s.byToken[token.SecretID] {
|
|
||||||
sub.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
case stream.Topic_ACLPolicies:
|
|
||||||
policy := event.Payload.(*structs.ACLPolicy)
|
|
||||||
tokens, err := aclTokenListByPolicy(tx, policy.ID, &policy.EnterpriseMeta)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.closeSubscriptionsForTokens(tokens)
|
|
||||||
|
|
||||||
// Find any roles using this policy so tokens with those roles can be reloaded.
|
|
||||||
roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for role := roles.Next(); role != nil; role = roles.Next() {
|
|
||||||
role := role.(*structs.ACLRole)
|
|
||||||
|
|
||||||
tokens, err := aclTokenListByRole(tx, role.ID, &policy.EnterpriseMeta)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.closeSubscriptionsForTokens(tokens)
|
|
||||||
}
|
|
||||||
|
|
||||||
case stream.Topic_ACLRoles:
|
|
||||||
role := event.Payload.(*structs.ACLRole)
|
|
||||||
tokens, err := aclTokenListByRole(tx, role.ID, &role.EnterpriseMeta)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.closeSubscriptionsForTokens(tokens)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This method requires the subscriptions.lock.RLock is held (the read-only lock)
|
|
||||||
func (s *subscriptions) closeSubscriptionsForTokens(tokens memdb.ResultIterator) {
|
|
||||||
for token := tokens.Next(); token != nil; token = tokens.Next() {
|
|
||||||
token := token.(*structs.ACLToken)
|
|
||||||
if subs, ok := s.byToken[token.SecretID]; ok {
|
|
||||||
for _, sub := range subs {
|
|
||||||
sub.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe returns a new stream.Subscription for the given request. A
|
// Subscribe returns a new stream.Subscription for the given request. A
|
||||||
// subscription will stream an initial snapshot of events matching the request
|
// subscription will stream an initial snapshot of events matching the request
|
||||||
// if required and then block until new events that modify the request occur, or
|
// if required and then block until new events that modify the request occur, or
|
||||||
|
@ -258,7 +185,7 @@ func (e *EventPublisher) Subscribe(
|
||||||
) (*stream.Subscription, error) {
|
) (*stream.Subscription, error) {
|
||||||
// Ensure we know how to make a snapshot for this topic
|
// Ensure we know how to make a snapshot for this topic
|
||||||
_, ok := e.handlers[req.Topic]
|
_, ok := e.handlers[req.Topic]
|
||||||
if !ok {
|
if !ok || req.Topic == stream.TopicInternal {
|
||||||
return nil, fmt.Errorf("unknown topic %d", req.Topic)
|
return nil, fmt.Errorf("unknown topic %d", req.Topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,6 +258,19 @@ func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscripti
|
||||||
subsByToken[req] = sub
|
subsByToken[req] = sub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
||||||
|
s.lock.RLock()
|
||||||
|
defer s.lock.RUnlock()
|
||||||
|
|
||||||
|
for _, secretID := range tokenSecretIDs {
|
||||||
|
if subs, ok := s.byToken[secretID]; ok {
|
||||||
|
for _, sub := range subs {
|
||||||
|
sub.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// unsubscribe must be called when a client is no longer interested in a
|
// unsubscribe must be called when a client is no longer interested in a
|
||||||
// subscription to free resources monitoring changes in it's ACL token.
|
// subscription to free resources monitoring changes in it's ACL token.
|
||||||
//
|
//
|
||||||
|
|
|
@ -133,8 +133,8 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]TopicHandler {
|
||||||
return idx, nil
|
return idx, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
stream.Topic_ACLTokens: {
|
stream.TopicInternal: {
|
||||||
ProcessChanges: aclEventsFromChanges,
|
ProcessChanges: aclChangeUnsubscribeEvent,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,16 +7,8 @@ import (
|
||||||
// newTopicHandlers returns the default handlers for state change events.
|
// newTopicHandlers returns the default handlers for state change events.
|
||||||
func newTopicHandlers() map[stream.Topic]TopicHandler {
|
func newTopicHandlers() map[stream.Topic]TopicHandler {
|
||||||
return map[stream.Topic]TopicHandler{
|
return map[stream.Topic]TopicHandler{
|
||||||
// For now we don't actually support subscribing to ACL* topics externally
|
// TopicInternal is a special case for processors that handle events that are
|
||||||
// so these have no Snapshot methods yet. We do need to have a
|
// not for subscribers. They are used by the stream package.
|
||||||
// ProcessChanges func to publish the partial events on ACL changes though
|
stream.TopicInternal: {ProcessChanges: aclChangeUnsubscribeEvent},
|
||||||
// so that we can invalidate other subscriptions if their effective ACL
|
|
||||||
// permissions change.
|
|
||||||
stream.Topic_ACLTokens: {
|
|
||||||
ProcessChanges: aclEventsFromChanges,
|
|
||||||
},
|
|
||||||
// Note no ACLPolicies/ACLRoles defined yet because we publish all events
|
|
||||||
// from one handler to save on iterating/filtering and duplicating code and
|
|
||||||
// there are no snapshots for these yet per comment above.
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,11 +5,9 @@ type Topic int32
|
||||||
// TODO: remove underscores
|
// TODO: remove underscores
|
||||||
// TODO: type string instead of int?
|
// TODO: type string instead of int?
|
||||||
const (
|
const (
|
||||||
Topic_ServiceHealth Topic = 0
|
TopicInternal Topic = 0
|
||||||
Topic_ServiceHealthConnect Topic = 1
|
Topic_ServiceHealth Topic = 1
|
||||||
Topic_ACLTokens Topic = 2
|
Topic_ServiceHealthConnect Topic = 2
|
||||||
Topic_ACLPolicies Topic = 3
|
|
||||||
Topic_ACLRoles Topic = 4
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO:
|
// TODO:
|
||||||
|
@ -31,3 +29,15 @@ func (e Event) IsResumeStream() bool {
|
||||||
type endOfSnapshot struct{}
|
type endOfSnapshot struct{}
|
||||||
|
|
||||||
type ResumeStream struct{}
|
type ResumeStream struct{}
|
||||||
|
|
||||||
|
// TODO: unexport once EventPublisher is in stream package
|
||||||
|
type UnsubscribePayload struct {
|
||||||
|
TokensSecretIDs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUnsubscribeEvent returns a special Event that is handled by the
|
||||||
|
// stream package, and is never sent to subscribers. It results in any subscriptions
|
||||||
|
// which match any of the TokenSecretIDs to be unsubscribed.
|
||||||
|
func NewUnsubscribeEvent(tokenSecretIDs []string) Event {
|
||||||
|
return Event{Payload: UnsubscribePayload{TokensSecretIDs: tokenSecretIDs}}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue