diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index fff8ba155..7e0fafb73 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2542,9 +2542,14 @@ func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMet } // Delete all associated with gateway first, to avoid keeping mappings that were removed - if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceName(conf.GetName(), entMeta)); err != nil { + sn := structs.NewServiceName(conf.GetName(), entMeta) + + if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } + if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil { + return fmt.Errorf("failed to truncate mesh topology for gateway: %v", err) + } for _, svc := range gatewayServices { // If the service is a wildcard we need to target all services within the namespace @@ -2734,6 +2739,10 @@ func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService) if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { return fmt.Errorf("failed updating gateway-services index: %v", err) } + + if err := insertGatewayServiceTopologyMapping(tx, idx, mapping); err != nil { + return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err) + } return nil } @@ -2785,6 +2794,9 @@ func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) erro if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { return fmt.Errorf("failed updating gateway-services index: %v", err) } + if err := deleteGatewayServiceTopologyMapping(tx, idx, gs); err != nil { + return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err) + } } } } @@ -3229,3 +3241,56 @@ func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) erro } return nil } + +func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error { + // Only ingress gateways are standalone items in the mesh topology viz + if gs.GatewayKind != structs.ServiceKindIngressGateway || gs.Service.Name == structs.WildcardSpecifier { + return nil + } + + mapping := structs.UpstreamDownstream{ + Upstream: gs.Service, + Downstream: gs.Gateway, + RaftIndex: gs.RaftIndex, + } + if err := tx.Insert(topologyTableName, &mapping); err != nil { + return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err) + } + if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { + return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + } + + return nil +} + +func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error { + // Only ingress gateways are standalone items in the mesh topology viz + if gs.GatewayKind != structs.ServiceKindIngressGateway { + return nil + } + + if _, err := tx.DeleteAll(topologyTableName, "id", gs.Service, gs.Gateway); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + } + if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { + return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + } + + return nil +} + +func truncateGatewayServiceTopologyMappings(tx *txn, idx uint64, gateway structs.ServiceName, kind string) error { + // Only ingress gateways are standalone items in the mesh topology viz + if kind != string(structs.ServiceKindIngressGateway) { + return nil + } + + if _, err := tx.DeleteAll(topologyTableName, "downstream", gateway); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + } + if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { + return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + } + + return nil +} diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index bef05520f..6ef5070ab 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6708,6 +6708,218 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { require.Empty(t, exp.names) } +func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) { + type expect struct { + idx uint64 + names []structs.ServiceName + } + + s := testStateStore(t) + + require.NoError(t, s.EnsureNode(0, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + require.NoError(t, s.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, nil)) + + defaultMeta := structs.DefaultEnterpriseMeta() + ingress := structs.NewServiceName("ingress", defaultMeta) + + ws := memdb.NewWatchSet() + tx := s.db.ReadTxn() + idx, names, err := upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + assert.Zero(t, idx) + assert.Len(t, names, 0) + + // Watch should fire since the ingress -> [web, api] mappings were inserted into the topology table + require.NoError(t, s.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{ + Kind: "ingress-gateway", + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "api", + EnterpriseMeta: *defaultMeta, + }, + { + Name: "web", + EnterpriseMeta: *defaultMeta, + }, + }, + }, + }, + }, nil)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + + exp := expect{ + idx: 2, + names: []structs.ServiceName{ + {Name: "api", EnterpriseMeta: *defaultMeta}, + {Name: "web", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Now delete a gateway service and topology table should be updated + require.NoError(t, s.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{ + Kind: "ingress-gateway", + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "api", + EnterpriseMeta: *defaultMeta, + }, + }, + }, + }, + }, nil)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + + exp = expect{ + // Expect index where the upstream was replaced + idx: 3, + names: []structs.ServiceName{ + {Name: "api", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Now replace api with a wildcard and no services should be returned because none are registered + require.NoError(t, s.EnsureConfigEntry(4, &structs.IngressGatewayConfigEntry{ + Kind: "ingress-gateway", + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "*", + EnterpriseMeta: *defaultMeta, + }, + }, + }, + }, + }, nil)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + require.Equal(t, uint64(4), idx) + require.Len(t, names, 0) + + // Adding a service will be covered by the ingress wildcard and added to the topology + svc := structs.NodeService{ + ID: "db", + Service: "db", + Address: "127.0.0.3", + Port: 443, + EnterpriseMeta: *defaultMeta, + } + require.NoError(t, s.EnsureService(5, "foo", &svc)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + + exp = expect{ + // Expect index where the upstream was replaced + idx: 5, + names: []structs.ServiceName{ + {Name: "db", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Deleting a service covered by a wildcard should delete its mapping + require.NoError(t, s.DeleteService(6, "foo", svc.ID, &svc.EnterpriseMeta)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + require.Equal(t, uint64(6), idx) + require.Len(t, names, 0) + + // Now add a service again, to test the effect of deleting the config entry itself + require.NoError(t, s.EnsureConfigEntry(7, &structs.IngressGatewayConfigEntry{ + Kind: "ingress-gateway", + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "api", + EnterpriseMeta: *defaultMeta, + }, + }, + }, + }, + }, nil)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + + exp = expect{ + // Expect index where the upstream was replaced + idx: 7, + names: []structs.ServiceName{ + {Name: "api", EnterpriseMeta: *defaultMeta}, + }, + } + require.Equal(t, exp.idx, idx) + require.ElementsMatch(t, exp.names, names) + + // Deleting the config entry should remove the mapping + require.NoError(t, s.DeleteConfigEntry(8, "ingress-gateway", "ingress", defaultMeta)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + tx = s.db.ReadTxn() + idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress) + require.NoError(t, err) + require.Equal(t, uint64(8), idx) + require.Len(t, names, 0) +} + func TestCatalog_DownstreamsForService(t *testing.T) { defaultMeta := structs.DefaultEnterpriseMeta() diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 9779aa6aa..328ad15d3 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -265,14 +265,25 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct // If the config entry is for terminating or ingress gateways we delete entries from the memdb table // that associates gateways <-> services. + sn := structs.NewServiceName(name, entMeta) + if kind == structs.TerminatingGateway || kind == structs.IngressGateway { - if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)); err != nil { + if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { return fmt.Errorf("failed updating gateway-services index: %v", err) } } + // Also clean up associations in the mesh topology table for ingress gateways + if kind == structs.IngressGateway { + if _, err := tx.DeleteAll(topologyTableName, "downstream", sn); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + } + if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { + return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + } + } err = validateProposedConfigEntryInGraph(tx, kind, name, nil, entMeta) if err != nil { diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index 38d8325e0..5802c7488 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -439,7 +439,7 @@ func prepSummaryOutput(summaries map[structs.ServiceName]*ServiceSummary, exclud sum.ChecksCritical++ } } - if excludeSidecars && sum.Kind != structs.ServiceKindTypical { + if excludeSidecars && sum.Kind != structs.ServiceKindTypical && sum.Kind != structs.ServiceKindIngressGateway { continue } resp = append(resp, sum) diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index 1ca6f3147..07b13a579 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -828,6 +828,7 @@ func TestUIGatewayIntentions(t *testing.T) { a := NewTestAgent(t, "") defer a.Shutdown() + testrpc.WaitForServiceIntentions(t, a.RPC, "dc1") // Register terminating gateway and config entry linking it to postgres + redis { @@ -940,6 +941,31 @@ func TestUIServiceTopology(t *testing.T) { // Register api -> web -> redis { registrations := map[string]*structs.RegisterRequest{ + "Node edge": { + Datacenter: "dc1", + Node: "edge", + Address: "127.0.0.20", + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "edge", + CheckID: "edge:alive", + Name: "edge-liveness", + Status: api.HealthPassing, + }, + }, + }, + "Ingress gateway on edge": { + Datacenter: "dc1", + Node: "edge", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindIngressGateway, + ID: "ingress", + Service: "ingress", + Port: 443, + Address: "198.18.1.20", + }, + }, "Node foo": { Datacenter: "dc1", Node: "foo", @@ -1205,7 +1231,8 @@ func TestUIServiceTopology(t *testing.T) { } } - // Add intentions: deny all, web -> redis with L7 perms, but omit intention for api -> web + // Add intentions: deny all, ingress -> api, web -> redis with L7 perms, but omit intention for api -> web + // Add ingress config: ingress -> api { entries := []structs.ConfigEntryRequest{ { @@ -1252,6 +1279,38 @@ func TestUIServiceTopology(t *testing.T) { }, }, }, + { + Datacenter: "dc1", + Entry: &structs.ServiceIntentionsConfigEntry{ + Kind: structs.ServiceIntentions, + Name: "api", + Sources: []*structs.SourceIntention{ + { + Name: "ingress", + Action: structs.IntentionActionAllow, + }, + }, + }, + }, + { + Datacenter: "dc1", + Entry: &structs.IngressGatewayConfigEntry{ + Kind: "ingress-gateway", + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "api", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + }, + }, + }, + }, } for _, req := range entries { out := false @@ -1259,6 +1318,45 @@ func TestUIServiceTopology(t *testing.T) { } } + t.Run("ingress", func(t *testing.T) { + retry.Run(t, func(r *retry.R) { + // Request topology for ingress + req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.UIServiceTopology(resp, req) + assert.Nil(r, err) + require.NoError(r, checkIndex(resp)) + + expect := ServiceTopology{ + Upstreams: []*ServiceTopologySummary{ + { + ServiceSummary: ServiceSummary{ + Name: "api", + Datacenter: "dc1", + Nodes: []string{"foo"}, + InstanceCount: 1, + ChecksPassing: 3, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Intention: structs.IntentionDecisionSummary{ + Allowed: true, + HasPermissions: false, + }, + }, + }, + FilteredByACLs: false, + } + result := obj.(ServiceTopology) + + // Internal accounting that is not returned in JSON response + for _, u := range result.Upstreams { + u.externalSourceSet = nil + u.checks = nil + } + require.Equal(r, expect, result) + }) + }) + t.Run("api", func(t *testing.T) { retry.Run(t, func(r *retry.R) { // Request topology for api @@ -1269,6 +1367,23 @@ func TestUIServiceTopology(t *testing.T) { require.NoError(r, checkIndex(resp)) expect := ServiceTopology{ + Downstreams: []*ServiceTopologySummary{ + { + ServiceSummary: ServiceSummary{ + Name: "ingress", + Kind: structs.ServiceKindIngressGateway, + Datacenter: "dc1", + Nodes: []string{"edge"}, + InstanceCount: 1, + ChecksPassing: 1, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Intention: structs.IntentionDecisionSummary{ + Allowed: true, + HasPermissions: false, + }, + }, + }, Upstreams: []*ServiceTopologySummary{ { ServiceSummary: ServiceSummary{ @@ -1297,6 +1412,10 @@ func TestUIServiceTopology(t *testing.T) { u.externalSourceSet = nil u.checks = nil } + for _, d := range result.Downstreams { + d.externalSourceSet = nil + d.checks = nil + } require.Equal(r, expect, result) }) })