From 2020e9c7c7920cee57d3396824af1cc50d2ca46d Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 15 Jun 2020 18:49:00 -0400 Subject: [PATCH] ProcessChanges: use stream.Event Also remove secretHash, which was used to hash tokens. We don't expose these tokens anywhere, so we can use the string itself instead of a Hash. Fix acl_events_test.go for storing a structs type. --- agent/consul/state/acl.go | 6 +- agent/consul/state/acl_events.go | 85 ++--- agent/consul/state/acl_events_test.go | 363 ++++++++------------- agent/consul/state/acl_oss.go | 6 +- agent/consul/state/event_publisher.go | 89 ++--- agent/consul/state/event_publisher_test.go | 95 +++--- agent/consul/state/stream_topics.go | 11 +- agent/consul/stream/event.go | 9 +- agent/consul/stream/event_snapshot_test.go | 2 +- agent/consul/stream/subscription.go | 1 + 10 files changed, 261 insertions(+), 406 deletions(-) diff --git a/agent/consul/state/acl.go b/agent/consul/state/acl.go index c1b6e6aa0..2783ec1ad 100644 --- a/agent/consul/state/acl.go +++ b/agent/consul/state/acl.go @@ -869,11 +869,11 @@ func (s *Store) ACLTokenList(ws memdb.WatchSet, local, global bool, policy, role } } else if policy != "" && role == "" && methodName == "" { - iter, err = s.aclTokenListByPolicy(tx, policy, entMeta) + iter, err = aclTokenListByPolicy(tx, policy, entMeta) needLocalityFilter = true } else if policy == "" && role != "" && methodName == "" { - iter, err = s.aclTokenListByRole(tx, role, entMeta) + iter, err = aclTokenListByRole(tx, role, entMeta) needLocalityFilter = true } else if policy == "" && role == "" && methodName != "" { @@ -1464,7 +1464,7 @@ func (s *Store) ACLRoleList(ws memdb.WatchSet, policy string, entMeta *structs.E var err error if policy != "" { - iter, err = s.aclRoleListByPolicy(tx, policy, entMeta) + iter, err = aclRoleListByPolicy(tx, policy, entMeta) } else { iter, err = s.aclRoleList(tx, entMeta) } diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index f033e880f..94dbfb546 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -1,81 +1,54 @@ package state import ( - "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" ) // ACLEventsFromChanges returns all the ACL token, policy or role events that // should be emitted given a set of changes to the state store. -func (s *Store) ACLEventsFromChanges(tx *txn, changes memdb.Changes) ([]agentpb.Event, error) { - - // Don't allocate yet since in majority of update transactions no ACL token - // will be changed. - var events []agentpb.Event - - getObj := func(change memdb.Change) interface{} { - if change.Deleted() { - return change.Before - } - return change.After - } - - getOp := func(change memdb.Change) agentpb.ACLOp { - if change.Deleted() { - return agentpb.ACLOp_Delete - } - return agentpb.ACLOp_Update - } +// TODO: Add OpDelete/OpUpdate to the event or payload? +func aclEventsFromChanges(tx *txn, changes memdb.Changes) ([]stream.Event, error) { + var events []stream.Event + // TODO: mapping of table->topic? for _, change := range changes { switch change.Table { case "acl-tokens": - token := getObj(change).(*structs.ACLToken) - e := agentpb.Event{ - Topic: agentpb.Topic_ACLTokens, - Index: tx.Index, - Payload: &agentpb.Event_ACLToken{ - ACLToken: &agentpb.ACLTokenUpdate{ - Op: getOp(change), - Token: &agentpb.ACLTokenIdentifier{ - AccessorID: token.AccessorID, - SecretID: token.SecretID, - }, - }, - }, + token := changeObject(change).(*structs.ACLToken) + e := stream.Event{ + Topic: stream.Topic_ACLTokens, + Index: tx.Index, + Payload: token, } events = append(events, e) case "acl-policies": - policy := getObj(change).(*structs.ACLPolicy) - e := agentpb.Event{ - Topic: agentpb.Topic_ACLPolicies, - Index: tx.Index, - Payload: &agentpb.Event_ACLPolicy{ - ACLPolicy: &agentpb.ACLPolicyUpdate{ - Op: getOp(change), - PolicyID: policy.ID, - }, - }, + policy := changeObject(change).(*structs.ACLPolicy) + e := stream.Event{ + Topic: stream.Topic_ACLPolicies, + Index: tx.Index, + Payload: policy, } events = append(events, e) case "acl-roles": - role := getObj(change).(*structs.ACLRole) - e := agentpb.Event{ - Topic: agentpb.Topic_ACLRoles, - Index: tx.Index, - Payload: &agentpb.Event_ACLRole{ - ACLRole: &agentpb.ACLRoleUpdate{ - Op: getOp(change), - RoleID: role.ID, - }, - }, + role := changeObject(change).(*structs.ACLRole) + e := stream.Event{ + Topic: stream.Topic_ACLRoles, + Index: tx.Index, + Payload: role, } events = append(events, e) - default: - continue } } - return events, nil } + +// changeObject returns the object before it was deleted if the change was a delete, +// otherwise returns the object after the change. +func changeObject(change memdb.Change) interface{} { + if change.Deleted() { + return change.Before + } + return change.After +} diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go index e471042b8..52bc5f9e7 100644 --- a/agent/consul/state/acl_events_test.go +++ b/agent/consul/state/acl_events_test.go @@ -5,291 +5,121 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/agent/agentpb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" ) -func testACLTokenEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event { - t.Helper() - uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", - strconv.Itoa(n)) - op := agentpb.ACLOp_Update - if delete { - op = agentpb.ACLOp_Delete - } - return agentpb.Event{ - Topic: agentpb.Topic_ACLTokens, - Index: idx, - Payload: &agentpb.Event_ACLToken{ - ACLToken: &agentpb.ACLTokenUpdate{ - Op: op, - Token: &agentpb.ACLTokenIdentifier{ - AccessorID: uuid, - SecretID: uuid, - }, - }, - }, - } -} - -func testACLPolicyEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event { - t.Helper() - uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?", - strconv.Itoa(n)) - op := agentpb.ACLOp_Update - if delete { - op = agentpb.ACLOp_Delete - } - return agentpb.Event{ - Topic: agentpb.Topic_ACLPolicies, - Index: idx, - Payload: &agentpb.Event_ACLPolicy{ - ACLPolicy: &agentpb.ACLPolicyUpdate{ - Op: op, - PolicyID: uuid, - }, - }, - } -} - -func testACLRoleEvent(t *testing.T, idx uint64, n int, delete bool) agentpb.Event { - t.Helper() - uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?", - strconv.Itoa(n)) - op := agentpb.ACLOp_Update - if delete { - op = agentpb.ACLOp_Delete - } - return agentpb.Event{ - Topic: agentpb.Topic_ACLRoles, - Index: idx, - Payload: &agentpb.Event_ACLRole{ - ACLRole: &agentpb.ACLRoleUpdate{ - Op: op, - RoleID: uuid, - }, - }, - } -} - -func testToken(t *testing.T, n int) *structs.ACLToken { - uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", - strconv.Itoa(n)) - return &structs.ACLToken{ - AccessorID: uuid, - SecretID: uuid, - } -} - -func testPolicy(t *testing.T, n int) *structs.ACLPolicy { - numStr := strconv.Itoa(n) - uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?", numStr) - return &structs.ACLPolicy{ - ID: uuid, - Name: "test_policy_" + numStr, - Rules: `operator = "read"`, - } -} - -func testRole(t *testing.T, n, p int) *structs.ACLRole { - numStr := strconv.Itoa(n) - uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?", numStr) - policy := testPolicy(t, p) - return &structs.ACLRole{ - ID: uuid, - Name: "test_role_" + numStr, - Policies: []structs.ACLRolePolicyLink{{ - ID: policy.ID, - Name: policy.Name, - }}, - } -} - func TestACLEventsFromChanges(t *testing.T) { cases := []struct { - Name string - Setup func(s *Store, tx *txn) error - Mutate func(s *Store, tx *txn) error - WantEvents []agentpb.Event - WantErr bool + Name string + Setup func(s *Store, tx *txn) error + Mutate func(s *Store, tx *txn) error + expected stream.Event }{ { Name: "token create", Mutate: func(s *Store, tx *txn) error { - if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil { - return err - } - return nil + return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false) }, - WantEvents: []agentpb.Event{ - testACLTokenEvent(t, 100, 1, false), - }, - WantErr: false, + expected: newACLTokenEvent(100, 1), }, { Name: "token update", Setup: func(s *Store, tx *txn) error { - if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil { - return err - } - return nil + return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false) }, Mutate: func(s *Store, tx *txn) error { // Add a policy to the token (never mind it doesn't exist for now) we // allow it in the set command below. - token := testToken(t, 1) + token := newACLToken(1) token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}} - if err := s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false); err != nil { - return err - } - return nil + return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false) }, - WantEvents: []agentpb.Event{ - // Should see an event from the update - testACLTokenEvent(t, 100, 1, false), - }, - WantErr: false, + expected: newACLTokenEvent(100, 1, structs.ACLTokenPolicyLink{ID: "33333333-1111-1111-1111-111111111111"}), }, { Name: "token delete", Setup: func(s *Store, tx *txn) error { - if err := s.aclTokenSetTxn(tx, tx.Index, testToken(t, 1), false, false, false, false); err != nil { - return err - } - return nil + return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false) }, Mutate: func(s *Store, tx *txn) error { - // Delete it - token := testToken(t, 1) - if err := s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil); err != nil { - return err - } - return nil + token := newACLToken(1) + return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil) }, - WantEvents: []agentpb.Event{ - // Should see a delete event - testACLTokenEvent(t, 100, 1, true), - }, - WantErr: false, + expected: newACLTokenEvent(100, 1), }, { Name: "policy create", Mutate: func(s *Store, tx *txn) error { - if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil { - return err - } - return nil + return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1)) }, - WantEvents: []agentpb.Event{ - testACLPolicyEvent(t, 100, 1, false), - }, - WantErr: false, + expected: newACLPolicyEvent(100, 1), }, { Name: "policy update", Setup: func(s *Store, tx *txn) error { - if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil { - return err - } - return nil + return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1)) }, Mutate: func(s *Store, tx *txn) error { - policy := testPolicy(t, 1) + policy := newACLPolicy(1) policy.Rules = `operator = "write"` - if err := s.aclPolicySetTxn(tx, tx.Index, policy); err != nil { - return err - } - return nil + return s.aclPolicySetTxn(tx, tx.Index, policy) }, - WantEvents: []agentpb.Event{ - // Should see an event from the update - testACLPolicyEvent(t, 100, 1, false), + expected: stream.Event{ + Topic: stream.Topic_ACLPolicies, + Index: 100, + Payload: &structs.ACLPolicy{ + ID: "22222222-1111-1111-1111-111111111111", + Name: "test_policy_1", + Rules: `operator = "write"`, + }, }, - WantErr: false, }, { Name: "policy delete", Setup: func(s *Store, tx *txn) error { - if err := s.aclPolicySetTxn(tx, tx.Index, testPolicy(t, 1)); err != nil { - return err - } - return nil + return s.aclPolicySetTxn(tx, tx.Index, newACLPolicy(1)) }, Mutate: func(s *Store, tx *txn) error { - // Delete it - policy := testPolicy(t, 1) - if err := s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil); err != nil { - return err - } - return nil + policy := newACLPolicy(1) + return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil) }, - WantEvents: []agentpb.Event{ - // Should see a delete event - testACLPolicyEvent(t, 100, 1, true), - }, - WantErr: false, + expected: newACLPolicyEvent(100, 1), }, { Name: "role create", Mutate: func(s *Store, tx *txn) error { - if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil { - return err - } - return nil + return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true) }, - WantEvents: []agentpb.Event{ - testACLRoleEvent(t, 100, 1, false), - }, - WantErr: false, + expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1)), }, { Name: "role update", Setup: func(s *Store, tx *txn) error { - if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil { - return err - } - return nil + return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true) }, Mutate: func(s *Store, tx *txn) error { - role := testRole(t, 1, 1) - policy2 := testPolicy(t, 2) + role := newACLRole(1, newACLRolePolicyLink(1)) + policy2 := newACLPolicy(2) role.Policies = append(role.Policies, structs.ACLRolePolicyLink{ ID: policy2.ID, Name: policy2.Name, }) - if err := s.aclRoleSetTxn(tx, tx.Index, role, true); err != nil { - return err - } - return nil + return s.aclRoleSetTxn(tx, tx.Index, role, true) }, - WantEvents: []agentpb.Event{ - // Should see an event from the update - testACLRoleEvent(t, 100, 1, false), - }, - WantErr: false, + expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1), newACLRolePolicyLink(2)), }, { Name: "role delete", Setup: func(s *Store, tx *txn) error { - if err := s.aclRoleSetTxn(tx, tx.Index, testRole(t, 1, 1), true); err != nil { - return err - } - return nil + return s.aclRoleSetTxn(tx, tx.Index, newACLRole(1, newACLRolePolicyLink(1)), true) }, Mutate: func(s *Store, tx *txn) error { - // Delete it - role := testRole(t, 1, 1) - if err := s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil); err != nil { - return err - } - return nil + role := newACLRole(1, newACLRolePolicyLink(1)) + return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil) }, - WantEvents: []agentpb.Event{ - // Should see a delete event - testACLRoleEvent(t, 100, 1, true), - }, - WantErr: false, + expected: newACLRoleEvent(100, 1, newACLRolePolicyLink(1)), }, } @@ -315,28 +145,95 @@ func TestACLEventsFromChanges(t *testing.T) { // Note we call the func under test directly rather than publishChanges so // we can test this in isolation. - got, err := s.ACLEventsFromChanges(tx, tx.Changes()) - if tc.WantErr { - require.Error(t, err) - return - } + events, err := aclEventsFromChanges(tx, tx.Changes()) require.NoError(t, err) - // Make sure we have the right events, only taking ordering into account - // where it matters to account for non-determinism. - requireEventsInCorrectPartialOrder(t, tc.WantEvents, got, func(e agentpb.Event) string { - // We only care that events affecting the same actual token are ordered - // with respect ot each other so use it's ID as the key. - switch v := e.Payload.(type) { - case *agentpb.Event_ACLToken: - return "token:" + v.ACLToken.Token.AccessorID - case *agentpb.Event_ACLPolicy: - return "policy:" + v.ACLPolicy.PolicyID - case *agentpb.Event_ACLRole: - return "role:" + v.ACLRole.RoleID - } - return "" - }) + require.Len(t, events, 1) + actual := events[0] + // ignore modified and created index because we don't set them in our expected values + // TODO: gotest.tools/assert would make this easier + normalizePayload(&actual) + require.Equal(t, tc.expected, actual) }) } } + +func normalizePayload(s *stream.Event) { + switch s := s.Payload.(type) { + case *structs.ACLToken: + s.ModifyIndex = 0 + s.CreateIndex = 0 + s.Hash = nil + case *structs.ACLPolicy: + s.ModifyIndex = 0 + s.CreateIndex = 0 + case *structs.ACLRole: + s.ModifyIndex = 0 + s.CreateIndex = 0 + } +} + +func newACLTokenEvent(idx uint64, n int, policies ...structs.ACLTokenPolicyLink) stream.Event { + uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(n)) + return stream.Event{ + Topic: stream.Topic_ACLTokens, + Index: idx, + Payload: &structs.ACLToken{ + AccessorID: uuid, + SecretID: uuid, + Policies: policies, + }, + } +} + +func newACLPolicyEvent(idx uint64, n int) stream.Event { + return stream.Event{ + Topic: stream.Topic_ACLPolicies, + Index: idx, + Payload: newACLPolicy(n), + } +} + +func newACLRoleEvent(idx uint64, n int, policies ...structs.ACLRolePolicyLink) stream.Event { + return stream.Event{ + Topic: stream.Topic_ACLRoles, + Index: idx, + Payload: newACLRole(n, policies...), + } +} + +func newACLToken(n int) *structs.ACLToken { + uuid := strings.ReplaceAll("11111111-????-????-????-????????????", "?", strconv.Itoa(n)) + return &structs.ACLToken{ + AccessorID: uuid, + SecretID: uuid, + } +} + +func newACLPolicy(n int) *structs.ACLPolicy { + numStr := strconv.Itoa(n) + uuid := strings.ReplaceAll("22222222-????-????-????-????????????", "?", numStr) + return &structs.ACLPolicy{ + ID: uuid, + Name: "test_policy_" + numStr, + Rules: `operator = "read"`, + } +} + +func newACLRole(n int, policies ...structs.ACLRolePolicyLink) *structs.ACLRole { + numStr := strconv.Itoa(n) + uuid := strings.ReplaceAll("33333333-????-????-????-????????????", "?", numStr) + return &structs.ACLRole{ + ID: uuid, + Name: "test_role_" + numStr, + Policies: policies, + } +} + +func newACLRolePolicyLink(n int) structs.ACLRolePolicyLink { + policy := newACLPolicy(n) + return structs.ACLRolePolicyLink{ + ID: policy.ID, + Name: policy.Name, + } +} diff --git a/agent/consul/state/acl_oss.go b/agent/consul/state/acl_oss.go index fe872d3cd..9003e8d90 100644 --- a/agent/consul/state/acl_oss.go +++ b/agent/consul/state/acl_oss.go @@ -289,11 +289,11 @@ func (s *Store) aclTokenListGlobal(tx *txn, _ *structs.EnterpriseMeta) (memdb.Re return tx.Get("acl-tokens", "local", false) } -func (s *Store) aclTokenListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "policies", policy) } -func (s *Store) aclTokenListByRole(tx *txn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclTokenListByRole(tx *txn, role string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-tokens", "roles", role) } @@ -355,7 +355,7 @@ func (s *Store) aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIte return tx.Get("acl-roles", "id") } -func (s *Store) aclRoleListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { +func aclRoleListByPolicy(tx *txn, policy string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get("acl-roles", "policies", policy) } diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index 277e03032..a223733a5 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -7,7 +7,6 @@ import ( "time" "github.com/hashicorp/go-memdb" - "golang.org/x/crypto/blake2b" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" @@ -175,83 +174,56 @@ func (e *EventPublisher) sendEvents(update commitUpdate) { func (e *EventPublisher) handleACLUpdate(tx *txn, event stream.Event) error { switch event.Topic { case stream.Topic_ACLTokens: - token := event.GetACLToken() - subs := e.subsByToken[secretHash(token.Token.SecretID)] - for _, sub := range subs { + token := event.Payload.(*structs.ACLToken) + for _, sub := range e.subsByToken[token.SecretID] { sub.CloseReload() } + case stream.Topic_ACLPolicies: - policy := event.GetACLPolicy() - // TODO(streaming) figure out how to thread method/ent meta here for - // namespace support in Ent. Probably need wildcard here? - tokens, err := e.store.aclTokenListByPolicy(tx, policy.PolicyID, nil) + policy := event.Payload.(*structs.ACLPolicy) + tokens, err := aclTokenListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) if err != nil { return err } - - // Loop through the tokens used by the policy. - for token := tokens.Next(); token != nil; token = tokens.Next() { - token := token.(*structs.ACLToken) - if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok { - for _, sub := range subs { - sub.CloseReload() - } - } - } + e.closeSubscriptionsForTokens(tokens) // Find any roles using this policy so tokens with those roles can be reloaded. - roles, err := e.store.aclRoleListByPolicy(tx, policy.PolicyID, nil) + roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) if err != nil { return err } for role := roles.Next(); role != nil; role = roles.Next() { role := role.(*structs.ACLRole) - // TODO(streaming) figure out how to thread method/ent meta here for - // namespace support in Ent. - tokens, err := e.store.aclTokenListByRole(tx, role.ID, nil) + tokens, err := aclTokenListByRole(tx, role.ID, &policy.EnterpriseMeta) if err != nil { return err } - for token := tokens.Next(); token != nil; token = tokens.Next() { - token := token.(*structs.ACLToken) - if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok { - for _, sub := range subs { - sub.CloseReload() - } - } - } + e.closeSubscriptionsForTokens(tokens) } case stream.Topic_ACLRoles: - role := event.GetACLRole() - // TODO(streaming) figure out how to thread method/ent meta here for - // namespace support in Ent. - tokens, err := e.store.aclTokenListByRole(tx, role.RoleID, nil) + role := event.Payload.(*structs.ACLRole) + tokens, err := aclTokenListByRole(tx, role.ID, &role.EnterpriseMeta) if err != nil { return err } - for token := tokens.Next(); token != nil; token = tokens.Next() { - token := token.(*structs.ACLToken) - if subs, ok := e.subsByToken[secretHash(token.SecretID)]; ok { - for _, sub := range subs { - sub.CloseReload() - } - } - } + e.closeSubscriptionsForTokens(tokens) } return nil } -// secretHash returns a 256-bit Blake2 hash of the given string. -func secretHash(token string) string { - hash, err := blake2b.New256(nil) - if err != nil { - panic(err) +// This method requires the EventPublisher.lock is held +func (e *EventPublisher) closeSubscriptionsForTokens(tokens memdb.ResultIterator) { + for token := tokens.Next(); token != nil; token = tokens.Next() { + token := token.(*structs.ACLToken) + if subs, ok := e.subsByToken[token.SecretID]; ok { + for _, sub := range subs { + sub.CloseReload() + } + } } - hash.Write([]byte(token)) - return string(hash.Sum(nil)) } // Subscribe returns a new stream.Subscription for the given request. A @@ -270,7 +242,7 @@ func (e *EventPublisher) Subscribe( // Ensure we know how to make a snapshot for this topic _, ok := topicRegistry[req.Topic] if !ok { - return nil, fmt.Errorf("unknown topic %s", req.Topic) + return nil, fmt.Errorf("unknown topic %d", req.Topic) } e.lock.Lock() @@ -288,7 +260,7 @@ func (e *EventPublisher) Subscribe( topicHead := buf.Head() var sub *stream.Subscription if req.Index > 0 && len(topicHead.Events) > 0 && topicHead.Events[0].Index == req.Index { - // No need for a snapshot just send the "end snapshot" message to signal to + // No need for a snapshot, send the "resume stream" message to signal to // client it's cache is still good. (note that this can be distinguished // from a legitimate empty snapshot due to the index matching the one the // client sent), then follow along from here in the topic. @@ -296,7 +268,7 @@ func (e *EventPublisher) Subscribe( Index: req.Index, Topic: req.Topic, Key: req.Key, - Payload: &stream.Event_ResumeStream{ResumeStream: true}, + Payload: stream.ResumeStream{}, } // Make a new buffer to send to the client containing the resume. buf := stream.NewEventBuffer() @@ -324,12 +296,10 @@ func (e *EventPublisher) Subscribe( sub = stream.NewSubscription(ctx, req, snap.Snap) } - // Add the subscription to the ACL token map. - tokenHash := secretHash(req.Token) - subsByToken, ok := e.subsByToken[tokenHash] + subsByToken, ok := e.subsByToken[req.Token] if !ok { subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription) - e.subsByToken[tokenHash] = subsByToken + e.subsByToken[req.Token] = subsByToken } subsByToken[req] = sub @@ -343,14 +313,13 @@ func (e *EventPublisher) Unsubscribe(req *stream.SubscribeRequest) { e.lock.Lock() defer e.lock.Unlock() - tokenHash := secretHash(req.Token) - subsByToken, ok := e.subsByToken[tokenHash] + subsByToken, ok := e.subsByToken[req.Token] if !ok { return } delete(subsByToken, req) if len(subsByToken) == 0 { - delete(e.subsByToken, tokenHash) + delete(e.subsByToken, req.Token) } } @@ -370,7 +339,7 @@ func (e *EventPublisher) getSnapshotLocked(req *stream.SubscribeRequest, topicHe // No snap or errored snap in cache, create a new one snapFn, ok := e.snapFns[req.Topic] if !ok { - return nil, fmt.Errorf("unknown topic %s", req.Topic) + return nil, fmt.Errorf("unknown topic %d", req.Topic) } snap = stream.NewEventSnapshot(req, topicHead, snapFn) diff --git a/agent/consul/state/event_publisher_test.go b/agent/consul/state/event_publisher_test.go index 5b85f0d03..a0141cc43 100644 --- a/agent/consul/state/event_publisher_test.go +++ b/agent/consul/state/event_publisher_test.go @@ -6,14 +6,13 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/agentpb" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/require" ) type nextResult struct { - Events []agentpb.Event + Events []stream.Event Err error } @@ -40,12 +39,12 @@ func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { case next := <-eventCh: require.NoError(t, next.Err) require.Len(t, next.Events, 1) - t.Fatalf("got unwanted event: %#v", next.Events[0].GetPayload()) + t.Fatalf("got unwanted event: %#v", next.Events[0].Payload) case <-time.After(100 * time.Millisecond): } } -func assertEvent(t *testing.T, eventCh <-chan nextResult) *agentpb.Event { +func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event { t.Helper() select { case next := <-eventCh: @@ -82,7 +81,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { select { case next := <-eventCh: if allowEOS { - if next.Err == nil && len(next.Events) == 1 && next.Events[0].GetEndOfSnapshot() { + if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() { continue } } @@ -123,14 +122,16 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo // it assumes something lower down did that) and then wait for it to be reset // so we know the initial token write event has been sent out before // continuing... - subscription := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - sub, err := s.publisher.Subscribe(ctx, subscription) + + publisher := NewEventPublisher(s.db, 0, 0) + sub, err := publisher.Subscribe(ctx, subscription) require.NoError(t, err) eventCh := testRunSub(sub) @@ -144,7 +145,8 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo return token } -func TestPublisher_BasicPublish(t *testing.T) { +func TestEventPublisher_Publish_Success(t *testing.T) { + t.Skip("TODO: replace service registration with test events") t.Parallel() require := require.New(t) s := testStateStore(t) @@ -155,23 +157,24 @@ func TestPublisher_BasicPublish(t *testing.T) { require.NoError(s.EnsureRegistration(1, reg)) // Register the subscription. - subscription := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: reg.Service.Service, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - sub, err := s.publisher.Subscribe(ctx, subscription) + publisher := NewEventPublisher(s.db, 0, 0) + sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) eventCh := testRunSub(sub) // Stream should get the instance and then EndOfSnapshot e := assertEvent(t, eventCh) - sh := e.GetServiceHealth() + sh := e.Payload // TODO: examine payload, instead of not-nil check require.NotNil(sh, "expected service health event, got %v", e) e = assertEvent(t, eventCh) - require.True(e.GetEndOfSnapshot()) + require.True(e.IsEndOfSnapshot()) // Now subscriber should block waiting for updates assertNoEvent(t, eventCh) @@ -183,7 +186,7 @@ func TestPublisher_BasicPublish(t *testing.T) { // Subscriber should see registration e = assertEvent(t, eventCh) - sh = e.GetServiceHealth() + sh = e.Payload // TODO: examine payload, instead of not-nil check require.NotNil(sh, "expected service health event, got %v", e) } @@ -196,21 +199,23 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) { token := createTokenAndWaitForACLEventPublish(t, s) // Register the subscription. - subscription := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - sub, err := s.publisher.Subscribe(ctx, subscription) + + publisher := NewEventPublisher(s.db, 0, 0) + sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) eventCh := testRunSub(sub) // Stream should get EndOfSnapshot e := assertEvent(t, eventCh) - require.True(e.GetEndOfSnapshot()) + require.True(e.IsEndOfSnapshot()) // Update an unrelated token. token2 := &structs.ACLToken{ @@ -237,19 +242,19 @@ func TestPublisher_ACLTokenUpdate(t *testing.T) { require.Equal(stream.ErrSubscriptionReload, err) // Register another subscription. - subscription2 := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription2 := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } - sub2, err := s.publisher.Subscribe(ctx, subscription2) + sub2, err := publisher.Subscribe(ctx, subscription2) require.NoError(err) eventCh2 := testRunSub(sub2) // Expect initial EoS e = assertEvent(t, eventCh2) - require.True(e.GetEndOfSnapshot()) + require.True(e.IsEndOfSnapshot()) // Delete the unrelated token. require.NoError(s.ACLTokenDeleteByAccessor(5, token2.AccessorID, nil)) @@ -274,21 +279,23 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) { token := createTokenAndWaitForACLEventPublish(t, s) // Register the subscription. - subscription := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - sub, err := s.publisher.Subscribe(ctx, subscription) + + publisher := NewEventPublisher(s.db, 0, 0) + sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) eventCh := testRunSub(sub) // Ignore the end of snapshot event e := assertEvent(t, eventCh) - require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Update an unrelated policy. policy2 := structs.ACLPolicy{ @@ -319,19 +326,19 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) { assertReset(t, eventCh, true) // Register another subscription. - subscription2 := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription2 := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } - sub, err = s.publisher.Subscribe(ctx, subscription2) + sub, err = publisher.Subscribe(ctx, subscription2) require.NoError(err) eventCh = testRunSub(sub) // Ignore the end of snapshot event e = assertEvent(t, eventCh) - require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Delete the unrelated policy. require.NoError(s.ACLPolicyDeleteByID(5, testPolicyID_C, nil)) @@ -347,19 +354,19 @@ func TestPublisher_ACLPolicyUpdate(t *testing.T) { require.Equal(stream.ErrSubscriptionReload, err) // Register another subscription. - subscription3 := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription3 := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } - sub, err = s.publisher.Subscribe(ctx, subscription3) + sub, err = publisher.Subscribe(ctx, subscription3) require.NoError(err) eventCh = testRunSub(sub) // Ignore the end of snapshot event e = assertEvent(t, eventCh) - require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Now update the policy used in role B, but not directly in the token. policy4 := structs.ACLPolicy{ @@ -385,21 +392,23 @@ func TestPublisher_ACLRoleUpdate(t *testing.T) { token := createTokenAndWaitForACLEventPublish(t, s) // Register the subscription. - subscription := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - sub, err := s.publisher.Subscribe(ctx, subscription) + + publisher := NewEventPublisher(s.db, 0, 0) + sub, err := publisher.Subscribe(ctx, subscription) require.NoError(err) eventCh := testRunSub(sub) // Stream should get EndOfSnapshot e := assertEvent(t, eventCh) - require.True(e.GetEndOfSnapshot()) + require.True(e.IsEndOfSnapshot()) // Update an unrelated role (the token has role testRoleID_B). role := structs.ACLRole{ @@ -426,19 +435,19 @@ func TestPublisher_ACLRoleUpdate(t *testing.T) { assertReset(t, eventCh, false) // Register another subscription. - subscription2 := &agentpb.SubscribeRequest{ - Topic: agentpb.Topic_ServiceHealth, + subscription2 := &stream.SubscribeRequest{ + Topic: stream.Topic_ServiceHealth, Key: "nope", Token: token.SecretID, } - sub, err = s.publisher.Subscribe(ctx, subscription2) + sub, err = publisher.Subscribe(ctx, subscription2) require.NoError(err) eventCh = testRunSub(sub) // Ignore the end of snapshot event e = assertEvent(t, eventCh) - require.True(e.GetEndOfSnapshot(), "event should be a EoS got %v", e) + require.True(e.IsEndOfSnapshot(), "event should be a EoS got %v", e) // Delete the unrelated policy. require.NoError(s.ACLRoleDeleteByID(5, testRoleID_A, nil)) diff --git a/agent/consul/state/stream_topics.go b/agent/consul/state/stream_topics.go index 06ef5e002..1e8d8a57b 100644 --- a/agent/consul/state/stream_topics.go +++ b/agent/consul/state/stream_topics.go @@ -1,7 +1,6 @@ package state import ( - "github.com/hashicorp/consul/agent/agentpb" "github.com/hashicorp/consul/agent/consul/stream" memdb "github.com/hashicorp/go-memdb" ) @@ -23,12 +22,12 @@ type topicHandlers struct { var topicRegistry map[stream.Topic]topicHandlers func init() { - topicRegistry = map[agentpb.Topic]topicHandlers{ - agentpb.Topic_ServiceHealth: topicHandlers{ + topicRegistry = map[stream.Topic]topicHandlers{ + stream.Topic_ServiceHealth: topicHandlers{ Snapshot: (*Store).ServiceHealthSnapshot, ProcessChanges: (*Store).ServiceHealthEventsFromChanges, }, - agentpb.Topic_ServiceHealthConnect: topicHandlers{ + stream.Topic_ServiceHealthConnect: topicHandlers{ Snapshot: (*Store).ServiceHealthConnectSnapshot, // Note there is no ProcessChanges since Connect events are published by // the same event publisher as regular health events to avoid duplicating @@ -39,8 +38,8 @@ func init() { // ProcessChanges func to publish the partial events on ACL changes though // so that we can invalidate other subscriptions if their effective ACL // permissions change. - agentpb.Topic_ACLTokens: topicHandlers{ - ProcessChanges: (*Store).ACLEventsFromChanges, + stream.Topic_ACLTokens: topicHandlers{ + ProcessChanges: aclEventsFromChanges, }, // Note no ACLPolicies/ACLRoles defined yet because we publish all events // from one handler to save on iterating/filtering and duplicating code and diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 6fd83855b..29e99ee1a 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -3,6 +3,7 @@ package stream type Topic int32 // TODO: remove underscores +// TODO: type string instead of int? const ( Topic_ServiceHealth Topic = 0 Topic_ServiceHealthConnect Topic = 1 @@ -19,8 +20,14 @@ type Event struct { Payload interface{} } -func (e Event) isEndOfSnapshot() bool { +func (e Event) IsEndOfSnapshot() bool { return e.Payload == endOfSnapshot{} } +func (e Event) IsResumeStream() bool { + return e.Payload == ResumeStream{} +} + type endOfSnapshot struct{} + +type ResumeStream struct{} diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 4d6f4e6df..21164491d 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -131,7 +131,7 @@ func TestEventSnapshot(t *testing.T) { // We're done! break RECV } - case e.isEndOfSnapshot(): + case e.IsEndOfSnapshot(): snapDone = true default: payload, ok := e.Payload.(string) diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 5ca8eb75e..4f2fa1eee 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -128,6 +128,7 @@ func (s *Subscription) CloseReload() { } // Request returns the request object that started the subscription. +// TODO: remove func (s *Subscription) Request() *SubscribeRequest { return s.req }