160 lines
4.8 KiB
Go
160 lines
4.8 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package proxycfgglue
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/submatview"
|
|
"github.com/hashicorp/consul/proto/private/pbcommon"
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
)
|
|
|
|
// CacheServiceList satisfies the proxycfg.ServiceList interface by sourcing
|
|
// data from the agent cache.
|
|
func CacheServiceList(c *cache.Cache) proxycfg.ServiceList {
|
|
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.CatalogServiceListName}
|
|
}
|
|
|
|
func ServerServiceList(deps ServerDataSourceDeps, remoteSource proxycfg.ServiceList) proxycfg.ServiceList {
|
|
return &serverServiceList{deps, remoteSource}
|
|
}
|
|
|
|
type serverServiceList struct {
|
|
deps ServerDataSourceDeps
|
|
remoteSource proxycfg.ServiceList
|
|
}
|
|
|
|
func (s *serverServiceList) Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
|
if req.Datacenter != s.deps.Datacenter {
|
|
return s.remoteSource.Notify(ctx, req, correlationID, ch)
|
|
}
|
|
return s.deps.ViewStore.NotifyCallback(
|
|
ctx,
|
|
&serviceListRequest{s.deps, req},
|
|
correlationID,
|
|
dispatchCacheUpdate(ch),
|
|
)
|
|
}
|
|
|
|
type serviceListRequest struct {
|
|
deps ServerDataSourceDeps
|
|
req *structs.DCSpecificRequest
|
|
}
|
|
|
|
func (r *serviceListRequest) Request(index uint64) *pbsubscribe.SubscribeRequest {
|
|
return &pbsubscribe.SubscribeRequest{
|
|
Topic: pbsubscribe.Topic_ServiceList,
|
|
Subject: &pbsubscribe.SubscribeRequest_WildcardSubject{WildcardSubject: true},
|
|
Index: index,
|
|
Datacenter: r.req.Datacenter,
|
|
Token: r.req.QueryOptions.Token,
|
|
}
|
|
}
|
|
|
|
func (r *serviceListRequest) CacheInfo() cache.RequestInfo { return r.req.CacheInfo() }
|
|
|
|
func (r *serviceListRequest) NewMaterializer() (submatview.Materializer, error) {
|
|
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
|
|
Backend: r.deps.EventPublisher,
|
|
ACLResolver: r.deps.ACLResolver,
|
|
Deps: submatview.Deps{
|
|
View: newServiceListView(r.req.EnterpriseMeta),
|
|
Logger: r.deps.Logger,
|
|
Request: r.Request,
|
|
},
|
|
}), nil
|
|
}
|
|
|
|
func (serviceListRequest) Type() string { return "proxycfgglue.ServiceList" }
|
|
|
|
func newServiceListView(entMeta acl.EnterpriseMeta) *serviceListView {
|
|
view := &serviceListView{entMeta: entMeta}
|
|
view.Reset()
|
|
return view
|
|
}
|
|
|
|
type serviceListView struct {
|
|
entMeta acl.EnterpriseMeta
|
|
state map[string]structs.ServiceName
|
|
}
|
|
|
|
func (v *serviceListView) Reset() { v.state = make(map[string]structs.ServiceName) }
|
|
|
|
func (v *serviceListView) Update(events []*pbsubscribe.Event) error {
|
|
for _, event := range filterByEnterpriseMeta(events, v.entMeta) {
|
|
update := event.GetService()
|
|
if update == nil {
|
|
continue
|
|
}
|
|
|
|
var entMeta acl.EnterpriseMeta
|
|
pbcommon.EnterpriseMetaToStructs(update.EnterpriseMeta, &entMeta)
|
|
name := structs.NewServiceName(update.Name, &entMeta)
|
|
|
|
switch update.Op {
|
|
case pbsubscribe.CatalogOp_Register:
|
|
v.state[name.String()] = name
|
|
case pbsubscribe.CatalogOp_Deregister:
|
|
delete(v.state, name.String())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (v *serviceListView) Result(index uint64) any {
|
|
serviceList := make(structs.ServiceList, 0, len(v.state))
|
|
for _, name := range v.state {
|
|
serviceList = append(serviceList, name)
|
|
}
|
|
sort.Slice(serviceList, func(a, b int) bool {
|
|
return serviceList[a].String() < serviceList[b].String()
|
|
})
|
|
return &structs.IndexedServiceList{
|
|
Services: serviceList,
|
|
QueryMeta: structs.QueryMeta{
|
|
Backend: structs.QueryBackendStreaming,
|
|
Index: index,
|
|
},
|
|
}
|
|
}
|
|
|
|
// filterByEnterpriseMeta filters the given set of events to remove those that
|
|
// don't match the request's enterprise meta - this is necessary because when
|
|
// subscribing to a topic with SubjectWildcard we'll get events for resources
|
|
// in all partitions and namespaces.
|
|
func filterByEnterpriseMeta(events []*pbsubscribe.Event, entMeta acl.EnterpriseMeta) []*pbsubscribe.Event {
|
|
partition := entMeta.PartitionOrDefault()
|
|
namespace := entMeta.NamespaceOrDefault()
|
|
|
|
filtered := make([]*pbsubscribe.Event, 0, len(events))
|
|
for _, event := range events {
|
|
var eventEntMeta *pbcommon.EnterpriseMeta
|
|
switch payload := event.Payload.(type) {
|
|
case *pbsubscribe.Event_ConfigEntry:
|
|
eventEntMeta = payload.ConfigEntry.ConfigEntry.GetEnterpriseMeta()
|
|
case *pbsubscribe.Event_Service:
|
|
eventEntMeta = payload.Service.GetEnterpriseMeta()
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if partition != acl.WildcardName && !acl.EqualPartitions(partition, eventEntMeta.GetPartition()) {
|
|
continue
|
|
}
|
|
if namespace != acl.WildcardName && !acl.EqualNamespaces(namespace, eventEntMeta.GetNamespace()) {
|
|
continue
|
|
}
|
|
|
|
filtered = append(filtered, event)
|
|
}
|
|
return filtered
|
|
}
|