e3bff8fb39
This is the OSS portion of enterprise PR 2352. It adds a server-local implementation of the proxycfg.PeeredUpstreams interface based on a blocking query against the server's state store. It also fixes an omission in the Virtual IP freeing logic where we were never updating the max index (and therefore blocking queries against VirtualIPsForAllImportedServices would not return on service deletion).
56 lines
1.8 KiB
Go
56 lines
1.8 KiB
Go
package proxycfgglue
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/hashicorp/go-memdb"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/consul/watch"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface
|
|
// by sourcing data from the agent cache.
|
|
func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams {
|
|
return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName}
|
|
}
|
|
|
|
// ServerPeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface by
|
|
// sourcing data from a blocking query against the server's state store.
|
|
func ServerPeeredUpstreams(deps ServerDataSourceDeps) proxycfg.PeeredUpstreams {
|
|
return &serverPeeredUpstreams{deps}
|
|
}
|
|
|
|
type serverPeeredUpstreams struct {
|
|
deps ServerDataSourceDeps
|
|
}
|
|
|
|
func (s *serverPeeredUpstreams) Notify(ctx context.Context, req *structs.PartitionSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
|
// TODO(peering): ACL filtering.
|
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
|
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedPeeredServiceList, error) {
|
|
index, vips, err := store.VirtualIPsForAllImportedServices(ws, req.EnterpriseMeta)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
result := make([]structs.PeeredServiceName, 0, len(vips))
|
|
for _, vip := range vips {
|
|
result = append(result, vip.Service)
|
|
}
|
|
|
|
return index, &structs.IndexedPeeredServiceList{
|
|
Services: result,
|
|
QueryMeta: structs.QueryMeta{
|
|
Index: index,
|
|
Backend: structs.QueryBackendBlocking,
|
|
},
|
|
}, nil
|
|
},
|
|
dispatchBlockingQueryUpdate[*structs.IndexedPeeredServiceList](ch),
|
|
)
|
|
}
|