185 lines
5.2 KiB
Go
185 lines
5.2 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package proxycfgglue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/submatview"
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
)
|
|
|
|
// CacheIntentions satisfies the proxycfg.Intentions interface by sourcing data
|
|
// from the agent cache.
|
|
func CacheIntentions(c *cache.Cache) proxycfg.Intentions {
|
|
return cacheIntentions{c}
|
|
}
|
|
|
|
type cacheIntentions struct {
|
|
c *cache.Cache
|
|
}
|
|
|
|
func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
|
query := &structs.IntentionQueryRequest{
|
|
Match: &structs.IntentionQueryMatch{
|
|
Type: structs.IntentionMatchDestination,
|
|
Entries: []structs.IntentionMatchEntry{
|
|
{
|
|
Partition: req.PartitionOrDefault(),
|
|
Namespace: req.NamespaceOrDefault(),
|
|
Name: req.ServiceName,
|
|
},
|
|
},
|
|
},
|
|
QueryOptions: structs.QueryOptions{Token: req.QueryOptions.Token},
|
|
}
|
|
return c.c.NotifyCallback(ctx, cachetype.IntentionMatchName, query, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
|
|
var result any
|
|
if event.Err == nil {
|
|
rsp, ok := event.Result.(*structs.IndexedIntentionMatches)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var matches structs.Intentions
|
|
if len(rsp.Matches) != 0 {
|
|
matches = rsp.Matches[0]
|
|
}
|
|
result = matches
|
|
}
|
|
|
|
select {
|
|
case ch <- newUpdateEvent(correlationID, result, event.Err):
|
|
case <-ctx.Done():
|
|
}
|
|
})
|
|
}
|
|
|
|
// ServerIntentions satisfies the proxycfg.Intentions interface by sourcing
|
|
// data from local materialized views (backed by EventPublisher subscriptions).
|
|
func ServerIntentions(deps ServerDataSourceDeps) proxycfg.Intentions {
|
|
return &serverIntentions{deps}
|
|
}
|
|
|
|
type serverIntentions struct {
|
|
deps ServerDataSourceDeps
|
|
}
|
|
|
|
func (s *serverIntentions) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
|
// We may consume *multiple* streams (to handle wildcard intentions) and merge
|
|
// them into a single list of intentions.
|
|
//
|
|
// An alternative approach would be to consume events for all intentions and
|
|
// filter out the irrelevant ones. This would remove some complexity here but
|
|
// at the expense of significant overhead.
|
|
subjects := s.buildSubjects(req.ServiceName, req.EnterpriseMeta)
|
|
|
|
// mu guards state, as the callback functions provided in NotifyCallback below
|
|
// will be called in different goroutines.
|
|
var mu sync.Mutex
|
|
state := make([]*structs.ConfigEntryResponse, len(subjects))
|
|
|
|
// buildEvent constructs an event containing the matching intentions received
|
|
// from NotifyCallback calls below. If we have not received initial snapshots
|
|
// for all streams yet, the event will be empty and the second return value will
|
|
// be false (causing no event to be emittied).
|
|
//
|
|
// Note: mu must be held when calling this function.
|
|
buildEvent := func() (proxycfg.UpdateEvent, bool) {
|
|
intentions := make(structs.Intentions, 0)
|
|
|
|
for _, result := range state {
|
|
if result == nil {
|
|
return proxycfg.UpdateEvent{}, false
|
|
}
|
|
si, ok := result.Entry.(*structs.ServiceIntentionsConfigEntry)
|
|
if !ok {
|
|
continue
|
|
}
|
|
intentions = append(intentions, si.ToIntentions()...)
|
|
}
|
|
|
|
sort.Sort(structs.IntentionPrecedenceSorter(intentions))
|
|
|
|
return newUpdateEvent(correlationID, intentions, nil), true
|
|
}
|
|
|
|
for subjectIdx, subject := range subjects {
|
|
subjectIdx := subjectIdx
|
|
|
|
storeReq := intentionsRequest{
|
|
deps: s.deps,
|
|
baseReq: req,
|
|
subject: subject,
|
|
}
|
|
err := s.deps.ViewStore.NotifyCallback(ctx, storeReq, correlationID, func(ctx context.Context, cacheEvent cache.UpdateEvent) {
|
|
mu.Lock()
|
|
state[subjectIdx] = cacheEvent.Result.(*structs.ConfigEntryResponse)
|
|
event, ready := buildEvent()
|
|
mu.Unlock()
|
|
|
|
if ready {
|
|
select {
|
|
case ch <- event:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type intentionsRequest struct {
|
|
deps ServerDataSourceDeps
|
|
baseReq *structs.ServiceSpecificRequest
|
|
subject *pbsubscribe.NamedSubject
|
|
}
|
|
|
|
func (r intentionsRequest) CacheInfo() cache.RequestInfo {
|
|
info := r.baseReq.CacheInfo()
|
|
info.Key = fmt.Sprintf("%s/%s/%s/%s",
|
|
r.subject.PeerName,
|
|
r.subject.Partition,
|
|
r.subject.Namespace,
|
|
r.subject.Key,
|
|
)
|
|
return info
|
|
}
|
|
|
|
func (r intentionsRequest) NewMaterializer() (submatview.Materializer, error) {
|
|
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
|
|
Backend: r.deps.EventPublisher,
|
|
ACLResolver: r.deps.ACLResolver,
|
|
Deps: submatview.Deps{
|
|
View: &configEntryView{},
|
|
Logger: r.deps.Logger,
|
|
Request: r.Request,
|
|
},
|
|
}), nil
|
|
}
|
|
|
|
func (r intentionsRequest) Request(index uint64) *pbsubscribe.SubscribeRequest {
|
|
return &pbsubscribe.SubscribeRequest{
|
|
Topic: pbsubscribe.Topic_ServiceIntentions,
|
|
Index: index,
|
|
Datacenter: r.baseReq.Datacenter,
|
|
Token: r.baseReq.Token,
|
|
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{NamedSubject: r.subject},
|
|
}
|
|
}
|
|
|
|
func (r intentionsRequest) Type() string { return "proxycfgglue.ServiceIntentions" }
|