diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 7e591c377..bcdcf227a 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -123,6 +123,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event var nodeChanges map[string]changeType var serviceChanges map[nodeServiceTuple]serviceChange + var termGatewayChanges map[structs.ServiceName]map[structs.ServiceName]serviceChange markNode := func(node string, typ changeType) { if nodeChanges == nil { @@ -201,9 +202,45 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect) } } + case gatewayServicesTableName: + gs := changeObject(change).(*structs.GatewayService) + if gs.GatewayKind != structs.ServiceKindTerminatingGateway { + continue + } + + gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change} + if termGatewayChanges == nil { + termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange) + } + + gatewayChanges, ok := termGatewayChanges[gs.Gateway] + if !ok { + termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{} + } + + prevChange, ok := gatewayChanges[gs.Service] + if !ok { + termGatewayChanges[gs.Gateway][gs.Service] = gsChange + continue + } + + if changeTypeFromChange(change) == changeDelete { + termGatewayChanges[gs.Gateway][gs.Service] = gsChange + continue + } + + prevGs := changeObject(prevChange.change).(*structs.GatewayService) + if !gs.IsSame(prevGs) { + gsChange.changeType = changeUpdate + termGatewayChanges[gs.Gateway][gs.Service] = gsChange + } else { + delete(termGatewayChanges[gs.Gateway], gs.Service) + } } } + //fmt.Printf("term gateway map: %v", termGatewayChanges) + // Now act on those marked nodes/services for node, changeType := range nodeChanges { if changeType == changeDelete { @@ -221,9 +258,6 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } for tuple, srvChange := range serviceChanges { - // change may be nil if there was a change that _affected_ the service - // like a change to checks but it didn't actually change the service - // record itself. if srvChange.changeType == changeDelete { sn := srvChange.change.Before.(*structs.ServiceNode) e := newServiceHealthEventDeregister(changes.Index, sn) @@ -265,6 +299,53 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event events = append(events, e) } + for gatewayName, serviceChanges := range termGatewayChanges { + for serviceName, gsChange := range serviceChanges { + gs := changeObject(gsChange.change).(*structs.GatewayService) + + _, nodes, err := serviceGatewayNodes(tx, nil, serviceName.Name, gs.GatewayKind, &gatewayName.EnterpriseMeta) + if err != nil { + return nil, err + } + + // Always send deregister events for deletes/updates. + if gsChange.changeType != changeCreate { + for _, sn := range nodes { + e := newServiceHealthEventDeregister(changes.Index, sn) + + e.Topic = topicServiceHealthConnect + // todo(streaming): make namespace-aware in enterprise + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = serviceName.Name + e.Payload = payload + + events = append(events, e) + } + } + + if gsChange.changeType == changeDelete { + continue + } + + // Build service events and append them + for _, sn := range nodes { + tuple := newNodeServiceTupleFromServiceNode(sn) + e, err := newServiceHealthEventForService(tx, changes.Index, tuple) + if err != nil { + return nil, err + } + + e.Topic = topicServiceHealthConnect + // todo(streaming): make namespace-aware in enterprise + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = serviceName.Name + e.Payload = payload + + events = append(events, e) + } + } + } + // Duplicate any events that affected connect-enabled instances (proxies or // native apps) to the relevant Connect topic. connectEvents, err := serviceHealthToConnectEvents(tx, events...) diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index a16e1ce9f..17139ba82 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -174,6 +174,9 @@ func evIndexes(idx, create, modify uint64) func(e *stream.Event) error { } func TestServiceHealthEventsFromChanges(t *testing.T) { + setupIndex := uint64(10) + mutateIndex := uint64(100) + cases := []struct { Name string Setup func(s *Store, tx *txn) error @@ -1051,6 +1054,48 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceTermingGateway("srv2")), }, }, + { + Name: "terminating gateway config entry created after gateway exists", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evServiceIndex(setupIndex)), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }, + // terminating gateway with 2 instances + // changing config entry to add a linked service + // changing config entry to remove a linked service + // deleting a config entry + // deregistering a service behind a terminating gateway (should send no term gateway events) } for _, tc := range cases { @@ -1061,7 +1106,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { if tc.Setup != nil { // Bypass the publish mechanism for this test or we get into odd // recursive stuff... - setupTx := s.db.WriteTxn(10) + setupTx := s.db.WriteTxn(setupIndex) require.NoError(t, tc.Setup(s, setupTx)) // Commit the underlying transaction without using wrapped Commit so we // avoid the whole event publishing system for setup here. It _should_ @@ -1070,7 +1115,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { setupTx.Txn.Commit() } - tx := s.db.WriteTxn(100) + tx := s.db.WriteTxn(mutateIndex) require.NoError(t, tc.Mutate(s, tx)) // Note we call the func under test directly rather than publishChanges so @@ -1120,6 +1165,23 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error { } } +func evServiceIndex(idx uint64) func(e *stream.Event) error { + return func(e *stream.Event) error { + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.Value.Node.CreateIndex = idx + payload.Value.Node.ModifyIndex = idx + payload.Value.Service.CreateIndex = idx + payload.Value.Service.ModifyIndex = idx + for _, check := range payload.Value.Checks { + check.CreateIndex = idx + check.ModifyIndex = idx + } + e.Payload = payload + + return nil + } +} + func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { t.Helper() if diff := cmp.Diff(x, y, opts...); diff != "" {