1cd7ec0543
This is the OSS portion of enterprise PR 2339. It improves our handling of "irrecoverable" errors in proxycfg data sources. The canonical example of this is what happens when the ACL token presented by Envoy is deleted/revoked. Previously, the stream would get "stuck" until the xDS server re-checked the token (after 5 minutes) and terminated the stream. Materializers would also sit burning resources retrying something that could never succeed. Now, it is possible for data sources to mark errors as "terminal" which causes the xDS stream to be closed immediately. Similarly, the submatview.Store will evict materializers when it observes they have encountered such an error.
63 lines
2 KiB
Go
63 lines
2 KiB
Go
package proxycfgglue
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/hashicorp/go-memdb"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/watch"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/structs/aclfilter"
|
|
)
|
|
|
|
// ServerIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
|
|
// by sourcing data from a blocking query against the server's state store.
|
|
func ServerIntentionUpstreams(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams {
|
|
return serverIntentionUpstreams{deps}
|
|
}
|
|
|
|
type serverIntentionUpstreams struct {
|
|
deps ServerDataSourceDeps
|
|
}
|
|
|
|
func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
|
target := structs.NewServiceName(req.ServiceName, &req.EnterpriseMeta)
|
|
|
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
|
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedServiceList, error) {
|
|
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
defaultDecision := authz.IntentionDefaultAllow(nil)
|
|
|
|
index, services, err := store.IntentionTopology(ws, target, false, defaultDecision, structs.IntentionTargetService)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
result := &structs.IndexedServiceList{
|
|
Services: services,
|
|
QueryMeta: structs.QueryMeta{
|
|
Index: index,
|
|
Backend: structs.QueryBackendBlocking,
|
|
},
|
|
}
|
|
aclfilter.New(authz, s.deps.Logger).Filter(result)
|
|
|
|
return index, result, nil
|
|
},
|
|
dispatchBlockingQueryUpdate[*structs.IndexedServiceList](ch),
|
|
)
|
|
}
|
|
|
|
func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) {
|
|
return func(ctx context.Context, correlationID string, result ResultType, err error) {
|
|
select {
|
|
case ch <- newUpdateEvent(correlationID, result, err):
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
}
|