Add func to combine up+downstream queries

This commit is contained in:
freddygv 2020-09-28 19:41:24 -06:00
parent 160a6539d1
commit ac54bf99b3
4 changed files with 132 additions and 18 deletions

View File

@ -2017,6 +2017,33 @@ func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.C
return nil
}
// CombinedCheckServiceNodes is used to query all nodes and checks for both typical and Connect endpoints of a service
func (s *Store) CombinedCheckServiceNodes(ws memdb.WatchSet, service structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
var (
resp structs.CheckServiceNodes
maxIdx uint64
)
idx, csn, err := s.CheckServiceNodes(ws, service.Name, &service.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", service, err)
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
idx, csn, err = s.CheckConnectServiceNodes(ws, service.Name, &service.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", service, err)
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
return maxIdx, resp, nil
}
// CheckServiceNodes is used to query all nodes and checks for a given service.
func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
return s.checkServiceNodes(ws, serviceName, false, entMeta)
@ -2882,17 +2909,93 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi
return idx, svc.Protocol == protocol, nil
}
func (s *Store) ServiceTopology(
ws memdb.WatchSet,
dc, service string,
entMeta *structs.EnterpriseMeta,
) (uint64, *structs.ServiceTopology, error) {
var (
maxIdx uint64
sn = structs.NewServiceName(service, entMeta)
)
idx, upstreamNames, err := s.UpstreamsForService(ws, dc, sn)
if err != nil {
return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
var upstreams structs.CheckServiceNodes
for _, u := range upstreamNames {
// Collect both typical and connect endpoints, this allows aggregating check statuses across both
idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get upstream nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
upstreams = append(upstreams, csn...)
idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get upstream connect nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
upstreams = append(upstreams, csn...)
}
idx, downstreamNames, err := s.DownstreamsForService(ws, dc, sn)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
var downstreams structs.CheckServiceNodes
for _, u := range downstreamNames {
// Collect both typical and connect endpoints, this allows aggregating check statuses across both
idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
downstreams = append(downstreams, csn...)
idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
downstreams = append(downstreams, csn...)
}
resp := &structs.ServiceTopology{
Upstreams: upstreams,
Downstreams: downstreams,
}
return 0, resp, nil
}
// UpstreamsForService will find all upstream services that the input could route traffic to.
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
// TODO (freddy): Account for ingress gateways
func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
sn := structs.NewServiceName(service, entMeta)
idx, upstreams, err := upstreamsFromRegistration(ws, tx, sn)
if err != nil {
return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err)
return 0, nil, fmt.Errorf("failed to get registration upstreams for %q: %v", sn.String(), err)
}
var maxIdx uint64
@ -2930,15 +3033,14 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc, service string, entMe
// DownstreamsForService will find all downstream services that could route traffic to the input service.
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
// TODO (freddy): Account for ingress gateways
func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
// First fetch services with discovery chains that list the input as a target
sn := structs.NewServiceName(service, entMeta)
idx, sources, err := s.sourcesForTarget(ws, tx, dc, service, entMeta)
idx, sources, err := s.sourcesForTarget(ws, tx, dc, service)
if err != nil {
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", sn.String(), err)
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err)
}
var maxIdx uint64
@ -2954,7 +3056,7 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, ent
// We then follow these discovery chain sources one level down to the services defining them as an upstream.
idx, downstreams, err := downstreamsFromRegistration(ws, tx, s)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", s.String(), err)
return 0, nil, fmt.Errorf("failed to get registration downstreams for %q: %v", s.String(), err)
}
if idx > maxIdx {
maxIdx = idx
@ -2968,9 +3070,9 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, ent
}
// Also append services that directly listed the input as an upstream
idx, downstreams, err := downstreamsFromRegistration(ws, tx, sn)
idx, downstreams, err := downstreamsFromRegistration(ws, tx, service)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err)
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", service.String(), err)
}
if idx > maxIdx {
maxIdx = idx
@ -2994,7 +3096,7 @@ func downstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.Servi
return linkedFromRegistration(ws, tx, sn, true)
}
func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
// To fetch upstreams we query services that have the input listed as a downstream
// To fetch downstreams we query services that have the input listed as an upstream
index := "downstream"
@ -3002,7 +3104,7 @@ func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceNam
index = "upstream"
}
iter, err := tx.Get(topologyTableName, index, sn)
iter, err := tx.Get(topologyTableName, index, service)
if err != nil {
return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
}

View File

@ -6861,7 +6861,8 @@ func TestCatalog_UpstreamsForService(t *testing.T) {
}
ws := memdb.NewWatchSet()
idx, names, err := s.UpstreamsForService(ws, "dc1", "api", structs.DefaultEnterpriseMeta())
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
idx, names, err := s.UpstreamsForService(ws, "dc1", sn)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
@ -6993,11 +6994,12 @@ func TestCatalog_DownstreamsForService(t *testing.T) {
}
ws := memdb.NewWatchSet()
idx, ids, err := s.DownstreamsForService(ws, "dc1", "admin", structs.DefaultEnterpriseMeta())
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
idx, names, err := s.DownstreamsForService(ws, "dc1", sn)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.names, ids)
require.ElementsMatch(t, tc.expect.names, names)
})
}
}

View File

@ -403,8 +403,7 @@ func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service stri
}
// sourcesForTarget will return a list of services whose discovery chains have the input service as a target
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
destination := structs.NewServiceName(service, entMeta)
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
queue := []structs.ServiceName{destination}
seenLink := make(map[structs.ServiceName]bool)
@ -444,7 +443,7 @@ func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service stri
EvaluateInDatacenter: dc,
UseInDatacenter: dc,
}
idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, entMeta, req)
idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, &sn.EnterpriseMeta, req)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
}

View File

@ -1855,6 +1855,17 @@ type IndexedGatewayServices struct {
QueryMeta
}
type IndexedServiceTopology struct {
ServiceTopology *ServiceTopology
FilteredByACLs bool
QueryMeta
}
type ServiceTopology struct {
Upstreams CheckServiceNodes
Downstreams CheckServiceNodes
}
// IndexedConfigEntries has its own encoding logic which differs from
// ConfigEntryRequest as it has to send a slice of ConfigEntry.
type IndexedConfigEntries struct {