f198544270
* Stub Config Entries for Consul Native API Gateway (#15644) * Add empty InlineCertificate struct and protobuf * apigateway stubs * Stub HTTPRoute in api pkg * Stub HTTPRoute in structs pkg * Simplify api.APIGatewayConfigEntry to be consistent w/ other entries * Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry * Add TCPRoute to MakeConfigEntry, return unique Kind * Stub BoundAPIGatewayConfigEntry in agent * Add RaftIndex to APIGatewayConfigEntry stub * Add new config entry kinds to validation allow-list * Add RaftIndex to other added config entry stubs * Update usage metrics assertions to include new cfg entries * Add Meta and acl.EnterpriseMeta to all new ConfigEntry types * Remove unnecessary Services field from added config entry types * Implement GetMeta(), GetEnterpriseMeta() for added config entry types * Add meta field to proto, name consistently w/ existing config entries * Format config_entry.proto * Add initial implementation of CanRead + CanWrite for new config entry types * Add unit tests for decoding of new config entry types * Add unit tests for parsing of new config entry types * Add unit tests for API Gateway config entry ACLs * Return typed PermissionDeniedError on BoundAPIGateway CanWrite * Add unit tests for added config entry ACLs * Add BoundAPIGateway type to AllConfigEntryKinds * Return proper kind from BoundAPIGateway * Add docstrings for new config entry types * Add missing config entry kinds to proto def * Update usagemetrics_oss_test.go * Use utility func for returning PermissionDeniedError * EventPublisher subscriptions for Consul Native API Gateway (#15757) * Create new event topics in subscribe proto * Add tests for PBSubscribe func * Make configs singular, add all configs to PBToStreamSubscribeRequest * Add snapshot methods * Add config_entry_events tests * Add config entry kind to topic for new configs * Add unit tests for snapshot methods * Start adding integration test * Test using the new controller code * Update agent/consul/state/config_entry_events.go * Check value of error * Add controller stubs for API Gateway (#15837) * update initial stub implementation * move files, clean up mutex references * Remove embed, use idiomatic names for constructors * Remove stray file introduced in merge * Add APIGateway validation (#15847) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * APIGateway InlineCertificate validation (#15856) * Add APIGateway validation * Add additional validations * Add protobuf definitions * Tabs to spaces * Add API structs * Move struct fields around a bit * Add validation for InlineCertificate * Fix ACL test * APIGateway BoundAPIGateway validation (#15858) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Move struct fields around a bit * Add validation for BoundAPIGateway * APIGateway TCPRoute validation (#15855) * Add APIGateway validation * Add additional validations * Add cert ref validation * Add protobuf definitions * Fix up field types * Add API structs * Add TCPRoute normalization and validation * Add forgotten Status * Add some more field docs in api package * Fix test * Format imports * Rename snapshot test variable names * Add plumbing for Native API GW Subscriptions (#16003) Co-authored-by: Sarah Alsmiller <sarah.alsmiller@hashicorp.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> Co-authored-by: sarahalsmiller <100602640+sarahalsmiller@users.noreply.github.com> Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
255 lines
7.6 KiB
Go
255 lines
7.6 KiB
Go
package proxycfgglue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/submatview"
|
|
"github.com/hashicorp/consul/proto/pbcommon"
|
|
"github.com/hashicorp/consul/proto/pbconfigentry"
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
|
)
|
|
|
|
// CacheConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
|
|
// data from the agent cache.
|
|
func CacheConfigEntry(c *cache.Cache) proxycfg.ConfigEntry {
|
|
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryName}
|
|
}
|
|
|
|
// CacheConfigEntryList satisfies the proxycfg.ConfigEntryList interface by
|
|
// sourcing data from the agent cache.
|
|
func CacheConfigEntryList(c *cache.Cache) proxycfg.ConfigEntryList {
|
|
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryListName}
|
|
}
|
|
|
|
// ServerConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
|
|
// data from a local materialized view (backed by an EventPublisher subscription).
|
|
func ServerConfigEntry(deps ServerDataSourceDeps) proxycfg.ConfigEntry {
|
|
return serverConfigEntry{deps}
|
|
}
|
|
|
|
// ServerConfigEntryList satisfies the proxycfg.ConfigEntry interface by sourcing
|
|
// data from a local materialized view (backed by an EventPublisher subscription).
|
|
func ServerConfigEntryList(deps ServerDataSourceDeps) proxycfg.ConfigEntryList {
|
|
return serverConfigEntry{deps}
|
|
}
|
|
|
|
type serverConfigEntry struct {
|
|
deps ServerDataSourceDeps
|
|
}
|
|
|
|
func (e serverConfigEntry) Notify(ctx context.Context, req *structs.ConfigEntryQuery, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
|
cfgReq, err := newConfigEntryRequest(req, e.deps)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return e.deps.ViewStore.NotifyCallback(ctx, cfgReq, correlationID, dispatchCacheUpdate(ch))
|
|
}
|
|
|
|
func newConfigEntryRequest(req *structs.ConfigEntryQuery, deps ServerDataSourceDeps) (*configEntryRequest, error) {
|
|
var topic pbsubscribe.Topic
|
|
switch req.Kind {
|
|
case structs.MeshConfig:
|
|
topic = pbsubscribe.Topic_MeshConfig
|
|
case structs.ServiceResolver:
|
|
topic = pbsubscribe.Topic_ServiceResolver
|
|
case structs.IngressGateway:
|
|
topic = pbsubscribe.Topic_IngressGateway
|
|
case structs.ServiceDefaults:
|
|
topic = pbsubscribe.Topic_ServiceDefaults
|
|
case structs.APIGateway:
|
|
topic = pbsubscribe.Topic_APIGateway
|
|
case structs.HTTPRoute:
|
|
topic = pbsubscribe.Topic_HTTPRoute
|
|
case structs.TCPRoute:
|
|
topic = pbsubscribe.Topic_TCPRoute
|
|
case structs.InlineCertificate:
|
|
topic = pbsubscribe.Topic_InlineCertificate
|
|
case structs.BoundAPIGateway:
|
|
topic = pbsubscribe.Topic_BoundAPIGateway
|
|
default:
|
|
return nil, fmt.Errorf("cannot map config entry kind: %s to a topic", req.Kind)
|
|
}
|
|
return &configEntryRequest{
|
|
topic: topic,
|
|
req: req,
|
|
deps: deps,
|
|
}, nil
|
|
}
|
|
|
|
type configEntryRequest struct {
|
|
topic pbsubscribe.Topic
|
|
req *structs.ConfigEntryQuery
|
|
deps ServerDataSourceDeps
|
|
}
|
|
|
|
func (r *configEntryRequest) CacheInfo() cache.RequestInfo { return r.req.CacheInfo() }
|
|
|
|
func (r *configEntryRequest) NewMaterializer() (submatview.Materializer, error) {
|
|
var view submatview.View
|
|
if r.req.Name == "" {
|
|
view = newConfigEntryListView(r.req.Kind, r.req.EnterpriseMeta)
|
|
} else {
|
|
view = &configEntryView{}
|
|
}
|
|
|
|
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
|
|
Backend: r.deps.EventPublisher,
|
|
ACLResolver: r.deps.ACLResolver,
|
|
Deps: submatview.Deps{
|
|
View: view,
|
|
Logger: r.deps.Logger,
|
|
Request: r.Request,
|
|
},
|
|
}), nil
|
|
}
|
|
|
|
func (r *configEntryRequest) Type() string { return "proxycfgglue.ConfigEntry" }
|
|
|
|
func (r *configEntryRequest) Request(index uint64) *pbsubscribe.SubscribeRequest {
|
|
req := &pbsubscribe.SubscribeRequest{
|
|
Topic: r.topic,
|
|
Index: index,
|
|
Datacenter: r.req.Datacenter,
|
|
Token: r.req.QueryOptions.Token,
|
|
}
|
|
|
|
if name := r.req.Name; name == "" {
|
|
req.Subject = &pbsubscribe.SubscribeRequest_WildcardSubject{
|
|
WildcardSubject: true,
|
|
}
|
|
} else {
|
|
req.Subject = &pbsubscribe.SubscribeRequest_NamedSubject{
|
|
NamedSubject: &pbsubscribe.NamedSubject{
|
|
Key: name,
|
|
Partition: r.req.PartitionOrDefault(),
|
|
Namespace: r.req.NamespaceOrDefault(),
|
|
},
|
|
}
|
|
}
|
|
|
|
return req
|
|
}
|
|
|
|
// configEntryView implements a submatview.View for a single config entry.
|
|
type configEntryView struct {
|
|
state structs.ConfigEntry
|
|
}
|
|
|
|
func (v *configEntryView) Reset() {
|
|
v.state = nil
|
|
}
|
|
|
|
func (v *configEntryView) Result(index uint64) any {
|
|
return &structs.ConfigEntryResponse{
|
|
QueryMeta: structs.QueryMeta{
|
|
Index: index,
|
|
Backend: structs.QueryBackendStreaming,
|
|
},
|
|
Entry: v.state,
|
|
}
|
|
}
|
|
|
|
func (v *configEntryView) Update(events []*pbsubscribe.Event) error {
|
|
for _, event := range events {
|
|
update := event.GetConfigEntry()
|
|
if update == nil {
|
|
continue
|
|
}
|
|
switch update.Op {
|
|
case pbsubscribe.ConfigEntryUpdate_Delete:
|
|
v.state = nil
|
|
case pbsubscribe.ConfigEntryUpdate_Upsert:
|
|
v.state = pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// configEntryListView implements a submatview.View for a list of config entries
|
|
// that are all of the same kind (name is treated as unique).
|
|
type configEntryListView struct {
|
|
kind string
|
|
entMeta acl.EnterpriseMeta
|
|
state map[string]structs.ConfigEntry
|
|
}
|
|
|
|
func newConfigEntryListView(kind string, entMeta acl.EnterpriseMeta) *configEntryListView {
|
|
view := &configEntryListView{kind: kind, entMeta: entMeta}
|
|
view.Reset()
|
|
return view
|
|
}
|
|
|
|
func (v *configEntryListView) Reset() {
|
|
v.state = make(map[string]structs.ConfigEntry)
|
|
}
|
|
|
|
func (v *configEntryListView) Result(index uint64) any {
|
|
entries := make([]structs.ConfigEntry, 0, len(v.state))
|
|
for _, entry := range v.state {
|
|
entries = append(entries, entry)
|
|
}
|
|
|
|
return &structs.IndexedConfigEntries{
|
|
Kind: v.kind,
|
|
Entries: entries,
|
|
QueryMeta: structs.QueryMeta{
|
|
Index: index,
|
|
Backend: structs.QueryBackendStreaming,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (v *configEntryListView) Update(events []*pbsubscribe.Event) error {
|
|
for _, event := range filterByEnterpriseMeta(events, v.entMeta) {
|
|
update := event.GetConfigEntry()
|
|
configEntry := pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
|
|
name := structs.NewServiceName(configEntry.GetName(), configEntry.GetEnterpriseMeta()).String()
|
|
|
|
switch update.Op {
|
|
case pbsubscribe.ConfigEntryUpdate_Delete:
|
|
delete(v.state, name)
|
|
case pbsubscribe.ConfigEntryUpdate_Upsert:
|
|
v.state[name] = configEntry
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// filterByEnterpriseMeta filters the given set of events to remove those that
|
|
// don't match the request's enterprise meta - this is necessary because when
|
|
// subscribing to a topic with SubjectWildcard we'll get events for resources
|
|
// in all partitions and namespaces.
|
|
func filterByEnterpriseMeta(events []*pbsubscribe.Event, entMeta acl.EnterpriseMeta) []*pbsubscribe.Event {
|
|
partition := entMeta.PartitionOrDefault()
|
|
namespace := entMeta.NamespaceOrDefault()
|
|
|
|
filtered := make([]*pbsubscribe.Event, 0, len(events))
|
|
for _, event := range events {
|
|
var eventEntMeta *pbcommon.EnterpriseMeta
|
|
switch payload := event.Payload.(type) {
|
|
case *pbsubscribe.Event_ConfigEntry:
|
|
eventEntMeta = payload.ConfigEntry.ConfigEntry.GetEnterpriseMeta()
|
|
case *pbsubscribe.Event_Service:
|
|
eventEntMeta = payload.Service.GetEnterpriseMeta()
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if partition != acl.WildcardName && !acl.EqualPartitions(partition, eventEntMeta.GetPartition()) {
|
|
continue
|
|
}
|
|
if namespace != acl.WildcardName && !acl.EqualNamespaces(namespace, eventEntMeta.GetNamespace()) {
|
|
continue
|
|
}
|
|
|
|
filtered = append(filtered, event)
|
|
}
|
|
return filtered
|
|
}
|