open-consul/agent/consul/state/connect_ca_events.go
R.B. Boyer 809344a6f5
peering: initial sync (#12842)
- Add endpoints related to peering: read, list, generate token, initiate peering
- Update node/service/check table indexing to account for peers
- Foundational changes for pushing service updates to a peer
- Plumb peer name through Health.ServiceNodes path

see: ENT-1765, ENT-1280, ENT-1283, ENT-1283, ENT-1756, ENT-1739, ENT-1750, ENT-1679,
     ENT-1709, ENT-1704, ENT-1690, ENT-1689, ENT-1702, ENT-1701, ENT-1683, ENT-1663,
     ENT-1650, ENT-1678, ENT-1628, ENT-1658, ENT-1640, ENT-1637, ENT-1597, ENT-1634,
     ENT-1613, ENT-1616, ENT-1617, ENT-1591, ENT-1588, ENT-1596, ENT-1572, ENT-1555

Co-authored-by: R.B. Boyer <rb@hashicorp.com>
Co-authored-by: freddygv <freddy@hashicorp.com>
Co-authored-by: Chris S. Kim <ckim@hashicorp.com>
Co-authored-by: Evan Culver <eculver@hashicorp.com>
Co-authored-by: Nitya Dhanushkodi <nitya@hashicorp.com>
2022-04-21 17:34:40 -05:00

85 lines
2.3 KiB
Go

package state
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// EventTopicCARoots is the streaming topic to which events will be published
// when the list of active CA Roots changes. Each event payload contains the
// full list of roots.
//
// Note: topics are ordinarily defined in subscribe.proto, but this one isn't
// currently available via the Subscribe endpoint.
const EventTopicCARoots stream.StringTopic = "CARoots"
type EventPayloadCARoots struct {
CARoots structs.CARoots
}
func (e EventPayloadCARoots) Subject() stream.Subject { return stream.SubjectNone }
func (e EventPayloadCARoots) HasReadPermission(authz acl.Authorizer) bool {
// Require `service:write` on any service in any partition and namespace.
var authzContext acl.AuthorizerContext
structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier).
FillAuthzContext(&authzContext)
return authz.ServiceWriteAny(&authzContext) == acl.Allow
}
func (e EventPayloadCARoots) ToSubscriptionEvent(idx uint64) *pbsubscribe.Event {
panic("EventPayloadCARoots does not implement ToSubscriptionEvent")
}
// caRootsChangeEvents returns an event on EventTopicCARoots whenever the list
// of active CA Roots changes.
func caRootsChangeEvents(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var rootsChanged bool
for _, c := range changes.Changes {
if c.Table == tableConnectCARoots {
rootsChanged = true
break
}
}
if !rootsChanged {
return nil, nil
}
_, roots, err := caRootsTxn(tx, nil)
if err != nil {
return nil, err
}
return []stream.Event{
{
Topic: EventTopicCARoots,
Index: changes.Index,
Payload: EventPayloadCARoots{CARoots: roots},
},
}, nil
}
// caRootsSnapshot returns a stream.SnapshotFunc that provides a snapshot of
// the current active list of CA Roots.
func (s *Store) CARootsSnapshot(_ stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
idx, roots, err := caRootsTxn(tx, nil)
if err != nil {
return 0, err
}
buf.Append([]stream.Event{
{
Topic: EventTopicCARoots,
Index: idx,
Payload: EventPayloadCARoots{CARoots: roots},
},
})
return idx, nil
}