open-consul/agent/consul/state/config_entry_events.go
Thomas Eckert f198544270
Native API Gateway Config Entries (#15897)
* Stub Config Entries for Consul Native API Gateway (#15644)
* Add empty InlineCertificate struct and protobuf
* apigateway stubs
* Stub HTTPRoute in api pkg
* Stub HTTPRoute in structs pkg
* Simplify api.APIGatewayConfigEntry to be consistent w/ other entries
* Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry
* Add TCPRoute to MakeConfigEntry, return unique Kind
* Stub BoundAPIGatewayConfigEntry in agent
* Add RaftIndex to APIGatewayConfigEntry stub
* Add new config entry kinds to validation allow-list
* Add RaftIndex to other added config entry stubs
* Update usage metrics assertions to include new cfg entries
* Add Meta and acl.EnterpriseMeta to all new ConfigEntry types
* Remove unnecessary Services field from added config entry types
* Implement GetMeta(), GetEnterpriseMeta() for added config entry types
* Add meta field to proto, name consistently w/ existing config entries
* Format config_entry.proto
* Add initial implementation of CanRead + CanWrite for new config entry types
* Add unit tests for decoding of new config entry types
* Add unit tests for parsing of new config entry types
* Add unit tests for API Gateway config entry ACLs
* Return typed PermissionDeniedError on BoundAPIGateway CanWrite
* Add unit tests for added config entry ACLs
* Add BoundAPIGateway type to AllConfigEntryKinds
* Return proper kind from BoundAPIGateway
* Add docstrings for new config entry types
* Add missing config entry kinds to proto def
* Update usagemetrics_oss_test.go
* Use utility func for returning PermissionDeniedError
* EventPublisher subscriptions for Consul Native API Gateway (#15757)
* Create new event topics in subscribe proto
* Add tests for PBSubscribe func
* Make configs singular, add all configs to PBToStreamSubscribeRequest
* Add snapshot methods
* Add config_entry_events tests
* Add config entry kind to topic for new configs
* Add unit tests for snapshot methods
* Start adding integration test
* Test using the new controller code
* Update agent/consul/state/config_entry_events.go
* Check value of error
* Add controller stubs for API Gateway (#15837)
* update initial stub implementation
* move files, clean up mutex references
* Remove embed, use idiomatic names for constructors
* Remove stray file introduced in merge
* Add APIGateway validation (#15847)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Move struct fields around a bit
* APIGateway InlineCertificate validation (#15856)
* Add APIGateway validation
* Add additional validations
* Add protobuf definitions
* Tabs to spaces
* Add API structs
* Move struct fields around a bit
* Add validation for InlineCertificate
* Fix ACL test
* APIGateway BoundAPIGateway validation (#15858)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Move struct fields around a bit
* Add validation for BoundAPIGateway
* APIGateway TCPRoute validation (#15855)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Add TCPRoute normalization and validation
* Add forgotten Status
* Add some more field docs in api package
* Fix test
* Format imports
* Rename snapshot test variable names
* Add plumbing for Native API GW Subscriptions (#16003)

Co-authored-by: Sarah Alsmiller <sarah.alsmiller@hashicorp.com>
Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
Co-authored-by: sarahalsmiller <100602640+sarahalsmiller@users.noreply.github.com>
Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
2023-01-18 22:14:34 +00:00

200 lines
6.9 KiB
Go

package state
import (
"fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbconfigentry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// Adding events for a new config entry kind? Remember to update ConfigEntryFromStructs and ConfigEntryToStructs.
var configEntryKindToTopic = map[string]stream.Topic{
structs.MeshConfig: EventTopicMeshConfig,
structs.ServiceResolver: EventTopicServiceResolver,
structs.IngressGateway: EventTopicIngressGateway,
structs.ServiceIntentions: EventTopicServiceIntentions,
structs.ServiceDefaults: EventTopicServiceDefaults,
structs.APIGateway: EventTopicAPIGateway,
structs.TCPRoute: EventTopicTCPRoute,
structs.HTTPRoute: EventTopicHTTPRoute,
structs.InlineCertificate: EventTopicInlineCertificate,
structs.BoundAPIGateway: EventTopicBoundAPIGateway,
}
// EventSubjectConfigEntry is a stream.Subject used to route and receive events
// for a specific config entry (kind is encoded in the topic).
type EventSubjectConfigEntry struct {
Name string
EnterpriseMeta *acl.EnterpriseMeta
}
func (s EventSubjectConfigEntry) String() string {
return fmt.Sprintf(
"%s/%s/%s",
s.EnterpriseMeta.PartitionOrDefault(),
s.EnterpriseMeta.NamespaceOrDefault(),
s.Name,
)
}
type EventPayloadConfigEntry struct {
Op pbsubscribe.ConfigEntryUpdate_UpdateOp
Value structs.ConfigEntry
}
func (e EventPayloadConfigEntry) Subject() stream.Subject {
return EventSubjectConfigEntry{
Name: e.Value.GetName(),
EnterpriseMeta: e.Value.GetEnterpriseMeta(),
}
}
func (e EventPayloadConfigEntry) HasReadPermission(authz acl.Authorizer) bool {
return e.Value.CanRead(authz) == nil
}
func (e EventPayloadConfigEntry) ToSubscriptionEvent(idx uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: idx,
Payload: &pbsubscribe.Event_ConfigEntry{
ConfigEntry: &pbsubscribe.ConfigEntryUpdate{
Op: e.Op,
ConfigEntry: pbconfigentry.ConfigEntryFromStructs(e.Value),
},
},
}
}
// ConfigEntryEventsFromChanges returns events that will be emitted when config
// entries change in the state store.
func ConfigEntryEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, c := range changes.Changes {
if c.Table != tableConfigEntries {
continue
}
configEntry := changeObject(c).(structs.ConfigEntry)
topic, ok := configEntryKindToTopic[configEntry.GetKind()]
if !ok {
continue
}
op := pbsubscribe.ConfigEntryUpdate_Upsert
if c.Deleted() {
op = pbsubscribe.ConfigEntryUpdate_Delete
}
events = append(events, configEntryEvent(topic, changes.Index, op, configEntry))
}
return events, nil
}
// MeshConfigSnapshot is a stream.SnapshotFunc that returns a snapshot of mesh
// config entries.
func (s *Store) MeshConfigSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.MeshConfig, req, buf)
}
// ServiceResolverSnapshot is a stream.SnapshotFunc that returns a snapshot of
// service-resolver config entries.
func (s *Store) ServiceResolverSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.ServiceResolver, req, buf)
}
// IngressGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// ingress-gateway config entries.
func (s *Store) IngressGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.IngressGateway, req, buf)
}
// ServiceIntentionsSnapshot is a stream.SnapshotFunc that returns a snapshot of
// service-intentions config entries.
func (s *Store) ServiceIntentionsSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.ServiceIntentions, req, buf)
}
// ServiceDefaultsSnapshot is a stream.SnapshotFunc that returns a snapshot of
// service-defaults config entries.
func (s *Store) ServiceDefaultsSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.ServiceDefaults, req, buf)
}
// APIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// api-gateway config entries.
func (s *Store) APIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.APIGateway, req, buf)
}
// TCPRouteSnapshot is a stream.SnapshotFunc that returns a snapshot of
// tcp-route config entries.
func (s *Store) TCPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.TCPRoute, req, buf)
}
// HTTPRouteSnapshot is a stream.SnapshotFunc that retuns a snapshot of
// http-route config entries.
func (s *Store) HTTPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.HTTPRoute, req, buf)
}
// InlineCertificateSnapshot is a stream.SnapshotFunc that returns a snapshot of
// inline-certificate config entries.
func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.InlineCertificate, req, buf)
}
// BoundAPIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// bound-api-gateway config entries.
func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf)
}
func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
var (
idx uint64
err error
entries []structs.ConfigEntry
)
if subject, ok := req.Subject.(EventSubjectConfigEntry); ok {
var entry structs.ConfigEntry
idx, entry, err = s.ConfigEntry(nil, kind, subject.Name, subject.EnterpriseMeta)
if entry != nil {
entries = []structs.ConfigEntry{entry}
}
} else if req.Subject == stream.SubjectWildcard {
entMeta := structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier)
idx, entries, err = s.ConfigEntriesByKind(nil, kind, entMeta)
} else {
return 0, fmt.Errorf("subject must be of type EventSubjectConfigEntry or be SubjectWildcard, was: %T", req.Subject)
}
if err != nil {
return 0, err
}
if l := len(entries); l != 0 {
topic := configEntryKindToTopic[kind]
events := make([]stream.Event, l)
for i, e := range entries {
events[i] = configEntryEvent(topic, idx, pbsubscribe.ConfigEntryUpdate_Upsert, e)
}
buf.Append(events)
}
return idx, nil
}
func configEntryEvent(topic stream.Topic, idx uint64, op pbsubscribe.ConfigEntryUpdate_UpdateOp, configEntry structs.ConfigEntry) stream.Event {
return stream.Event{
Topic: topic,
Index: idx,
Payload: EventPayloadConfigEntry{
Op: op,
Value: configEntry,
},
}
}