open-consul/agent/proxycfg-glue/intentions.go
Daniel Upton 1cd7ec0543 proxycfg: terminate stream on irrecoverable errors
This is the OSS portion of enterprise PR 2339.

It improves our handling of "irrecoverable" errors in proxycfg data sources.

The canonical example of this is what happens when the ACL token presented by
Envoy is deleted/revoked. Previously, the stream would get "stuck" until the
xDS server re-checked the token (after 5 minutes) and terminated the stream.

Materializers would also sit burning resources retrying something that could
never succeed.

Now, it is possible for data sources to mark errors as "terminal" which causes
the xDS stream to be closed immediately. Similarly, the submatview.Store will
evict materializers when it observes they have encountered such an error.
2022-08-23 20:17:49 +01:00

182 lines
5.1 KiB
Go

package proxycfgglue
import (
"context"
"fmt"
"sort"
"sync"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// CacheIntentions satisfies the proxycfg.Intentions interface by sourcing data
// from the agent cache.
func CacheIntentions(c *cache.Cache) proxycfg.Intentions {
return cacheIntentions{c}
}
type cacheIntentions struct {
c *cache.Cache
}
func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
query := &structs.IntentionQueryRequest{
Match: &structs.IntentionQueryMatch{
Type: structs.IntentionMatchDestination,
Entries: []structs.IntentionMatchEntry{
{
Partition: req.PartitionOrDefault(),
Namespace: req.NamespaceOrDefault(),
Name: req.ServiceName,
},
},
},
QueryOptions: structs.QueryOptions{Token: req.QueryOptions.Token},
}
return c.c.NotifyCallback(ctx, cachetype.IntentionMatchName, query, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
var result any
if event.Err == nil {
rsp, ok := event.Result.(*structs.IndexedIntentionMatches)
if !ok {
return
}
var matches structs.Intentions
if len(rsp.Matches) != 0 {
matches = rsp.Matches[0]
}
result = matches
}
select {
case ch <- newUpdateEvent(correlationID, result, event.Err):
case <-ctx.Done():
}
})
}
// ServerIntentions satisfies the proxycfg.Intentions interface by sourcing
// data from local materialized views (backed by EventPublisher subscriptions).
func ServerIntentions(deps ServerDataSourceDeps) proxycfg.Intentions {
return &serverIntentions{deps}
}
type serverIntentions struct {
deps ServerDataSourceDeps
}
func (s *serverIntentions) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
// We may consume *multiple* streams (to handle wildcard intentions) and merge
// them into a single list of intentions.
//
// An alternative approach would be to consume events for all intentions and
// filter out the irrelevant ones. This would remove some complexity here but
// at the expense of significant overhead.
subjects := s.buildSubjects(req.ServiceName, req.EnterpriseMeta)
// mu guards state, as the callback functions provided in NotifyCallback below
// will be called in different goroutines.
var mu sync.Mutex
state := make([]*structs.ConfigEntryResponse, len(subjects))
// buildEvent constructs an event containing the matching intentions received
// from NotifyCallback calls below. If we have not received initial snapshots
// for all streams yet, the event will be empty and the second return value will
// be false (causing no event to be emittied).
//
// Note: mu must be held when calling this function.
buildEvent := func() (proxycfg.UpdateEvent, bool) {
intentions := make(structs.Intentions, 0)
for _, result := range state {
if result == nil {
return proxycfg.UpdateEvent{}, false
}
si, ok := result.Entry.(*structs.ServiceIntentionsConfigEntry)
if !ok {
continue
}
intentions = append(intentions, si.ToIntentions()...)
}
sort.Sort(structs.IntentionPrecedenceSorter(intentions))
return newUpdateEvent(correlationID, intentions, nil), true
}
for subjectIdx, subject := range subjects {
subjectIdx := subjectIdx
storeReq := intentionsRequest{
deps: s.deps,
baseReq: req,
subject: subject,
}
err := s.deps.ViewStore.NotifyCallback(ctx, storeReq, correlationID, func(ctx context.Context, cacheEvent cache.UpdateEvent) {
mu.Lock()
state[subjectIdx] = cacheEvent.Result.(*structs.ConfigEntryResponse)
event, ready := buildEvent()
mu.Unlock()
if ready {
select {
case ch <- event:
case <-ctx.Done():
}
}
})
if err != nil {
return err
}
}
return nil
}
type intentionsRequest struct {
deps ServerDataSourceDeps
baseReq *structs.ServiceSpecificRequest
subject *pbsubscribe.NamedSubject
}
func (r intentionsRequest) CacheInfo() cache.RequestInfo {
info := r.baseReq.CacheInfo()
info.Key = fmt.Sprintf("%s/%s/%s/%s",
r.subject.PeerName,
r.subject.Partition,
r.subject.Namespace,
r.subject.Key,
)
return info
}
func (r intentionsRequest) NewMaterializer() (submatview.Materializer, error) {
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
Backend: r.deps.EventPublisher,
ACLResolver: r.deps.ACLResolver,
Deps: submatview.Deps{
View: &configEntryView{},
Logger: r.deps.Logger,
Request: r.Request,
},
}), nil
}
func (r intentionsRequest) Request(index uint64) *pbsubscribe.SubscribeRequest {
return &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index,
Datacenter: r.baseReq.Datacenter,
Token: r.baseReq.Token,
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{NamedSubject: r.subject},
}
}
func (r intentionsRequest) Type() string { return "proxycfgglue.ServiceIntentions" }