Support ingress gateways in mesh viz endpoint (#8864)

Co-authored-by: R.B. Boyer <rb@hashicorp.com>
This commit is contained in:
Freddy 2020-10-08 09:47:09 -06:00 committed by GitHub
parent 75847b0f11
commit de4af766f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 411 additions and 4 deletions

View File

@ -2542,9 +2542,14 @@ func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMet
}
// Delete all associated with gateway first, to avoid keeping mappings that were removed
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceName(conf.GetName(), entMeta)); err != nil {
sn := structs.NewServiceName(conf.GetName(), entMeta)
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil {
return fmt.Errorf("failed to truncate mesh topology for gateway: %v", err)
}
for _, svc := range gatewayServices {
// If the service is a wildcard we need to target all services within the namespace
@ -2734,6 +2739,10 @@ func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService)
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
if err := insertGatewayServiceTopologyMapping(tx, idx, mapping); err != nil {
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
}
return nil
}
@ -2785,6 +2794,9 @@ func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) erro
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
if err := deleteGatewayServiceTopologyMapping(tx, idx, gs); err != nil {
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
}
}
}
}
@ -3229,3 +3241,56 @@ func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) erro
}
return nil
}
func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error {
// Only ingress gateways are standalone items in the mesh topology viz
if gs.GatewayKind != structs.ServiceKindIngressGateway || gs.Service.Name == structs.WildcardSpecifier {
return nil
}
mapping := structs.UpstreamDownstream{
Upstream: gs.Service,
Downstream: gs.Gateway,
RaftIndex: gs.RaftIndex,
}
if err := tx.Insert(topologyTableName, &mapping); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
return nil
}
func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error {
// Only ingress gateways are standalone items in the mesh topology viz
if gs.GatewayKind != structs.ServiceKindIngressGateway {
return nil
}
if _, err := tx.DeleteAll(topologyTableName, "id", gs.Service, gs.Gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
return nil
}
func truncateGatewayServiceTopologyMappings(tx *txn, idx uint64, gateway structs.ServiceName, kind string) error {
// Only ingress gateways are standalone items in the mesh topology viz
if kind != string(structs.ServiceKindIngressGateway) {
return nil
}
if _, err := tx.DeleteAll(topologyTableName, "downstream", gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
return nil
}

View File

@ -6708,6 +6708,218 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
require.Empty(t, exp.names)
}
func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
type expect struct {
idx uint64
names []structs.ServiceName
}
s := testStateStore(t)
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
require.NoError(t, s.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
}, nil))
defaultMeta := structs.DefaultEnterpriseMeta()
ingress := structs.NewServiceName("ingress", defaultMeta)
ws := memdb.NewWatchSet()
tx := s.db.ReadTxn()
idx, names, err := upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
assert.Zero(t, idx)
assert.Len(t, names, 0)
// Watch should fire since the ingress -> [web, api] mappings were inserted into the topology table
require.NoError(t, s.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 1111,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "api",
EnterpriseMeta: *defaultMeta,
},
{
Name: "web",
EnterpriseMeta: *defaultMeta,
},
},
},
},
}, nil))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
exp := expect{
idx: 2,
names: []structs.ServiceName{
{Name: "api", EnterpriseMeta: *defaultMeta},
{Name: "web", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now delete a gateway service and topology table should be updated
require.NoError(t, s.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 1111,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "api",
EnterpriseMeta: *defaultMeta,
},
},
},
},
}, nil))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
exp = expect{
// Expect index where the upstream was replaced
idx: 3,
names: []structs.ServiceName{
{Name: "api", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now replace api with a wildcard and no services should be returned because none are registered
require.NoError(t, s.EnsureConfigEntry(4, &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 1111,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "*",
EnterpriseMeta: *defaultMeta,
},
},
},
},
}, nil))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
require.Equal(t, uint64(4), idx)
require.Len(t, names, 0)
// Adding a service will be covered by the ingress wildcard and added to the topology
svc := structs.NodeService{
ID: "db",
Service: "db",
Address: "127.0.0.3",
Port: 443,
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(5, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
exp = expect{
// Expect index where the upstream was replaced
idx: 5,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Deleting a service covered by a wildcard should delete its mapping
require.NoError(t, s.DeleteService(6, "foo", svc.ID, &svc.EnterpriseMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
require.Equal(t, uint64(6), idx)
require.Len(t, names, 0)
// Now add a service again, to test the effect of deleting the config entry itself
require.NoError(t, s.EnsureConfigEntry(7, &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 1111,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "api",
EnterpriseMeta: *defaultMeta,
},
},
},
},
}, nil))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
exp = expect{
// Expect index where the upstream was replaced
idx: 7,
names: []structs.ServiceName{
{Name: "api", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Deleting the config entry should remove the mapping
require.NoError(t, s.DeleteConfigEntry(8, "ingress-gateway", "ingress", defaultMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
require.NoError(t, err)
require.Equal(t, uint64(8), idx)
require.Len(t, names, 0)
}
func TestCatalog_DownstreamsForService(t *testing.T) {
defaultMeta := structs.DefaultEnterpriseMeta()

View File

@ -265,14 +265,25 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
// If the config entry is for terminating or ingress gateways we delete entries from the memdb table
// that associates gateways <-> services.
sn := structs.NewServiceName(name, entMeta)
if kind == structs.TerminatingGateway || kind == structs.IngressGateway {
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)); err != nil {
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
// Also clean up associations in the mesh topology table for ingress gateways
if kind == structs.IngressGateway {
if _, err := tx.DeleteAll(topologyTableName, "downstream", sn); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
}
err = validateProposedConfigEntryInGraph(tx, kind, name, nil, entMeta)
if err != nil {

View File

@ -439,7 +439,7 @@ func prepSummaryOutput(summaries map[structs.ServiceName]*ServiceSummary, exclud
sum.ChecksCritical++
}
}
if excludeSidecars && sum.Kind != structs.ServiceKindTypical {
if excludeSidecars && sum.Kind != structs.ServiceKindTypical && sum.Kind != structs.ServiceKindIngressGateway {
continue
}
resp = append(resp, sum)

View File

@ -828,6 +828,7 @@ func TestUIGatewayIntentions(t *testing.T) {
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForServiceIntentions(t, a.RPC, "dc1")
// Register terminating gateway and config entry linking it to postgres + redis
{
@ -940,6 +941,31 @@ func TestUIServiceTopology(t *testing.T) {
// Register api -> web -> redis
{
registrations := map[string]*structs.RegisterRequest{
"Node edge": {
Datacenter: "dc1",
Node: "edge",
Address: "127.0.0.20",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "edge",
CheckID: "edge:alive",
Name: "edge-liveness",
Status: api.HealthPassing,
},
},
},
"Ingress gateway on edge": {
Datacenter: "dc1",
Node: "edge",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindIngressGateway,
ID: "ingress",
Service: "ingress",
Port: 443,
Address: "198.18.1.20",
},
},
"Node foo": {
Datacenter: "dc1",
Node: "foo",
@ -1205,7 +1231,8 @@ func TestUIServiceTopology(t *testing.T) {
}
}
// Add intentions: deny all, web -> redis with L7 perms, but omit intention for api -> web
// Add intentions: deny all, ingress -> api, web -> redis with L7 perms, but omit intention for api -> web
// Add ingress config: ingress -> api
{
entries := []structs.ConfigEntryRequest{
{
@ -1252,6 +1279,38 @@ func TestUIServiceTopology(t *testing.T) {
},
},
},
{
Datacenter: "dc1",
Entry: &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "api",
Sources: []*structs.SourceIntention{
{
Name: "ingress",
Action: structs.IntentionActionAllow,
},
},
},
},
{
Datacenter: "dc1",
Entry: &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 1111,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "api",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
},
},
},
},
}
for _, req := range entries {
out := false
@ -1259,6 +1318,45 @@ func TestUIServiceTopology(t *testing.T) {
}
}
t.Run("ingress", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
// Request topology for ingress
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.UIServiceTopology(resp, req)
assert.Nil(r, err)
require.NoError(r, checkIndex(resp))
expect := ServiceTopology{
Upstreams: []*ServiceTopologySummary{
{
ServiceSummary: ServiceSummary{
Name: "api",
Datacenter: "dc1",
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 3,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Intention: structs.IntentionDecisionSummary{
Allowed: true,
HasPermissions: false,
},
},
},
FilteredByACLs: false,
}
result := obj.(ServiceTopology)
// Internal accounting that is not returned in JSON response
for _, u := range result.Upstreams {
u.externalSourceSet = nil
u.checks = nil
}
require.Equal(r, expect, result)
})
})
t.Run("api", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
// Request topology for api
@ -1269,6 +1367,23 @@ func TestUIServiceTopology(t *testing.T) {
require.NoError(r, checkIndex(resp))
expect := ServiceTopology{
Downstreams: []*ServiceTopologySummary{
{
ServiceSummary: ServiceSummary{
Name: "ingress",
Kind: structs.ServiceKindIngressGateway,
Datacenter: "dc1",
Nodes: []string{"edge"},
InstanceCount: 1,
ChecksPassing: 1,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Intention: structs.IntentionDecisionSummary{
Allowed: true,
HasPermissions: false,
},
},
},
Upstreams: []*ServiceTopologySummary{
{
ServiceSummary: ServiceSummary{
@ -1297,6 +1412,10 @@ func TestUIServiceTopology(t *testing.T) {
u.externalSourceSet = nil
u.checks = nil
}
for _, d := range result.Downstreams {
d.externalSourceSet = nil
d.checks = nil
}
require.Equal(r, expect, result)
})
})