Do not evaluate discovery chain for topology upstreams
This commit is contained in:
parent
63c50e15bc
commit
82a17ccee6
|
@ -2921,7 +2921,7 @@ func (s *Store) ServiceTopology(
|
||||||
sn = structs.NewServiceName(service, entMeta)
|
sn = structs.NewServiceName(service, entMeta)
|
||||||
)
|
)
|
||||||
|
|
||||||
idx, upstreamNames, err := s.upstreamsForServiceTxn(tx, ws, dc, sn)
|
idx, upstreamNames, err := upstreamsFromRegistrationTxn(tx, ws, sn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
@ -2988,48 +2988,6 @@ func (s *Store) combinedServiceNodesTxn(tx *txn, ws memdb.WatchSet, names []stru
|
||||||
return maxIdx, resp, nil
|
return maxIdx, resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// upstreamsForServiceTxn will find all upstream services that the input could route traffic to.
|
|
||||||
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
|
|
||||||
// TODO (freddy): Account for ingress gateways
|
|
||||||
// TODO (freddy): Account for multi-dc upstreams
|
|
||||||
func (s *Store) upstreamsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
|
|
||||||
idx, upstreams, err := upstreamsFromRegistrationTxn(tx, ws, sn)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("failed to get registration upstreams for %q: %v", sn.String(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var maxIdx uint64
|
|
||||||
if idx > maxIdx {
|
|
||||||
maxIdx = idx
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
resp []structs.ServiceName
|
|
||||||
seen = make(map[structs.ServiceName]bool)
|
|
||||||
)
|
|
||||||
for _, u := range upstreams {
|
|
||||||
// Evaluate the targets from the upstream's discovery chain
|
|
||||||
idx, targets, err := s.discoveryChainTargetsTxn(tx, ws, dc, u.Name, &u.EnterpriseMeta)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err)
|
|
||||||
}
|
|
||||||
if idx > maxIdx {
|
|
||||||
maxIdx = idx
|
|
||||||
}
|
|
||||||
for _, t := range targets {
|
|
||||||
if !seen[t] {
|
|
||||||
resp = append(resp, t)
|
|
||||||
seen[t] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(targets) == 0 && !seen[u] {
|
|
||||||
resp = append(resp, u)
|
|
||||||
seen[u] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return maxIdx, resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// downstreamsForServiceTxn will find all downstream services that could route traffic to the input service.
|
// downstreamsForServiceTxn will find all downstream services that could route traffic to the input service.
|
||||||
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
|
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
|
||||||
// TODO (freddy): Account for ingress gateways
|
// TODO (freddy): Account for ingress gateways
|
||||||
|
|
|
@ -6708,172 +6708,6 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
|
||||||
require.Empty(t, exp.names)
|
require.Empty(t, exp.names)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCatalog_UpstreamsForService(t *testing.T) {
|
|
||||||
defaultMeta := structs.DefaultEnterpriseMeta()
|
|
||||||
|
|
||||||
type expect struct {
|
|
||||||
idx uint64
|
|
||||||
names []structs.ServiceName
|
|
||||||
}
|
|
||||||
tt := []struct {
|
|
||||||
name string
|
|
||||||
services []*structs.NodeService
|
|
||||||
entries []structs.ConfigEntry
|
|
||||||
expect expect
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "kitchen sink",
|
|
||||||
services: []*structs.NodeService{
|
|
||||||
{
|
|
||||||
Kind: structs.ServiceKindConnectProxy,
|
|
||||||
ID: "api-proxy",
|
|
||||||
Service: "api-proxy",
|
|
||||||
Address: "127.0.0.1",
|
|
||||||
Port: 443,
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "api",
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationName: "cache",
|
|
||||||
},
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationName: "db",
|
|
||||||
},
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationName: "admin",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
EnterpriseMeta: *defaultMeta,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Kind: structs.ServiceKindConnectProxy,
|
|
||||||
ID: "api-proxy-2",
|
|
||||||
Service: "api-proxy",
|
|
||||||
Address: "127.0.0.2",
|
|
||||||
Port: 443,
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "api",
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationName: "cache",
|
|
||||||
},
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationName: "db",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
EnterpriseMeta: *defaultMeta,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Kind: structs.ServiceKindConnectProxy,
|
|
||||||
ID: "unrelated-proxy",
|
|
||||||
Service: "unrelated-proxy",
|
|
||||||
Address: "127.0.0.3",
|
|
||||||
Port: 443,
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "unrelated",
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationName: "teapot",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
EnterpriseMeta: *defaultMeta,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
entries: []structs.ConfigEntry{
|
|
||||||
&structs.ProxyConfigEntry{
|
|
||||||
Kind: structs.ProxyDefaults,
|
|
||||||
Name: structs.ProxyConfigGlobal,
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&structs.ServiceRouterConfigEntry{
|
|
||||||
Kind: structs.ServiceRouter,
|
|
||||||
Name: "admin",
|
|
||||||
Routes: []structs.ServiceRoute{
|
|
||||||
{
|
|
||||||
Match: &structs.ServiceRouteMatch{
|
|
||||||
HTTP: &structs.ServiceRouteHTTPMatch{
|
|
||||||
PathExact: "/v2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Destination: &structs.ServiceRouteDestination{
|
|
||||||
Service: "new-admin",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&structs.ServiceResolverConfigEntry{
|
|
||||||
Kind: structs.ServiceResolver,
|
|
||||||
Name: "db",
|
|
||||||
Redirect: &structs.ServiceResolverRedirect{
|
|
||||||
Service: "cassandra",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&structs.ServiceResolverConfigEntry{
|
|
||||||
Kind: structs.ServiceResolver,
|
|
||||||
Name: "web",
|
|
||||||
Redirect: &structs.ServiceResolverRedirect{
|
|
||||||
Service: "sink",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expect: expect{
|
|
||||||
idx: 7,
|
|
||||||
names: []structs.ServiceName{
|
|
||||||
{Name: "cache", EnterpriseMeta: *defaultMeta},
|
|
||||||
{Name: "cassandra", EnterpriseMeta: *defaultMeta},
|
|
||||||
{Name: "admin", EnterpriseMeta: *defaultMeta},
|
|
||||||
{Name: "new-admin", EnterpriseMeta: *defaultMeta},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range tt {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
s := testStateStore(t)
|
|
||||||
|
|
||||||
require.NoError(t, s.EnsureNode(0, &structs.Node{
|
|
||||||
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
|
|
||||||
Node: "foo",
|
|
||||||
}))
|
|
||||||
|
|
||||||
var i uint64 = 1
|
|
||||||
for _, svc := range tc.services {
|
|
||||||
require.NoError(t, s.EnsureService(i, "foo", svc))
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
|
|
||||||
ca := &structs.CAConfiguration{
|
|
||||||
Provider: "consul",
|
|
||||||
}
|
|
||||||
err := s.CASetConfig(0, ca)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
for _, entry := range tc.entries {
|
|
||||||
require.NoError(t, entry.Normalize())
|
|
||||||
require.NoError(t, s.EnsureConfigEntry(i, entry, nil))
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := s.db.ReadTxn()
|
|
||||||
defer tx.Abort()
|
|
||||||
|
|
||||||
ws := memdb.NewWatchSet()
|
|
||||||
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
|
|
||||||
idx, names, err := s.upstreamsForServiceTxn(tx, ws, "dc1", sn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Equal(t, tc.expect.idx, idx)
|
|
||||||
require.ElementsMatch(t, tc.expect.names, names)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCatalog_DownstreamsForService(t *testing.T) {
|
func TestCatalog_DownstreamsForService(t *testing.T) {
|
||||||
defaultMeta := structs.DefaultEnterpriseMeta()
|
defaultMeta := structs.DefaultEnterpriseMeta()
|
||||||
|
|
||||||
|
|
|
@ -2411,11 +2411,11 @@ func (r *KeyringResponses) New() interface{} {
|
||||||
return new(KeyringResponses)
|
return new(KeyringResponses)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpstreamDownstream pairs come from individual proxy registrations, which can be updated independently.
|
||||||
type UpstreamDownstream struct {
|
type UpstreamDownstream struct {
|
||||||
Upstream ServiceName
|
Upstream ServiceName
|
||||||
Downstream ServiceName
|
Downstream ServiceName
|
||||||
|
|
||||||
// Upstream/Downstream pairs come from individual service registrations, and they can be updated individually.
|
|
||||||
// Refs stores the registrations that contain this pairing.
|
// Refs stores the registrations that contain this pairing.
|
||||||
// When there are no remaining Refs, the UpstreamDownstream can be deleted.
|
// When there are no remaining Refs, the UpstreamDownstream can be deleted.
|
||||||
//
|
//
|
||||||
|
|
|
@ -256,8 +256,8 @@ RPC:
|
||||||
}
|
}
|
||||||
|
|
||||||
// UIServiceTopology returns the list of upstreams and downstreams for a Connect enabled service.
|
// UIServiceTopology returns the list of upstreams and downstreams for a Connect enabled service.
|
||||||
// - Downstreams are services that may route to the input service.
|
// - Downstreams are services that list the given service as an upstream
|
||||||
// - Upstreams are the upstreams defined in the target service's proxy registrations
|
// - Upstreams are the upstreams defined in the given service's proxy registrations
|
||||||
func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
// Parse arguments
|
// Parse arguments
|
||||||
args := structs.ServiceSpecificRequest{}
|
args := structs.ServiceSpecificRequest{}
|
||||||
|
|
Loading…
Reference in a new issue