state: Add terminating gateway events on updating a config entry
Co-Authored-By: Daniel Nephin <dnephin@hashicorp.com>
This commit is contained in:
parent
f42a2ca8a3
commit
f31582624d
|
@ -123,6 +123,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
|
||||
var nodeChanges map[string]changeType
|
||||
var serviceChanges map[nodeServiceTuple]serviceChange
|
||||
var termGatewayChanges map[structs.ServiceName]map[structs.ServiceName]serviceChange
|
||||
|
||||
markNode := func(node string, typ changeType) {
|
||||
if nodeChanges == nil {
|
||||
|
@ -201,9 +202,45 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect)
|
||||
}
|
||||
}
|
||||
case gatewayServicesTableName:
|
||||
gs := changeObject(change).(*structs.GatewayService)
|
||||
if gs.GatewayKind != structs.ServiceKindTerminatingGateway {
|
||||
continue
|
||||
}
|
||||
|
||||
gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
||||
if termGatewayChanges == nil {
|
||||
termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange)
|
||||
}
|
||||
|
||||
gatewayChanges, ok := termGatewayChanges[gs.Gateway]
|
||||
if !ok {
|
||||
termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{}
|
||||
}
|
||||
|
||||
prevChange, ok := gatewayChanges[gs.Service]
|
||||
if !ok {
|
||||
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
||||
continue
|
||||
}
|
||||
|
||||
if changeTypeFromChange(change) == changeDelete {
|
||||
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
||||
continue
|
||||
}
|
||||
|
||||
prevGs := changeObject(prevChange.change).(*structs.GatewayService)
|
||||
if !gs.IsSame(prevGs) {
|
||||
gsChange.changeType = changeUpdate
|
||||
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
||||
} else {
|
||||
delete(termGatewayChanges[gs.Gateway], gs.Service)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//fmt.Printf("term gateway map: %v", termGatewayChanges)
|
||||
|
||||
// Now act on those marked nodes/services
|
||||
for node, changeType := range nodeChanges {
|
||||
if changeType == changeDelete {
|
||||
|
@ -221,9 +258,6 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
}
|
||||
|
||||
for tuple, srvChange := range serviceChanges {
|
||||
// change may be nil if there was a change that _affected_ the service
|
||||
// like a change to checks but it didn't actually change the service
|
||||
// record itself.
|
||||
if srvChange.changeType == changeDelete {
|
||||
sn := srvChange.change.Before.(*structs.ServiceNode)
|
||||
e := newServiceHealthEventDeregister(changes.Index, sn)
|
||||
|
@ -265,6 +299,53 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
events = append(events, e)
|
||||
}
|
||||
|
||||
for gatewayName, serviceChanges := range termGatewayChanges {
|
||||
for serviceName, gsChange := range serviceChanges {
|
||||
gs := changeObject(gsChange.change).(*structs.GatewayService)
|
||||
|
||||
_, nodes, err := serviceGatewayNodes(tx, nil, serviceName.Name, gs.GatewayKind, &gatewayName.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Always send deregister events for deletes/updates.
|
||||
if gsChange.changeType != changeCreate {
|
||||
for _, sn := range nodes {
|
||||
e := newServiceHealthEventDeregister(changes.Index, sn)
|
||||
|
||||
e.Topic = topicServiceHealthConnect
|
||||
// todo(streaming): make namespace-aware in enterprise
|
||||
payload := e.Payload.(EventPayloadCheckServiceNode)
|
||||
payload.key = serviceName.Name
|
||||
e.Payload = payload
|
||||
|
||||
events = append(events, e)
|
||||
}
|
||||
}
|
||||
|
||||
if gsChange.changeType == changeDelete {
|
||||
continue
|
||||
}
|
||||
|
||||
// Build service events and append them
|
||||
for _, sn := range nodes {
|
||||
tuple := newNodeServiceTupleFromServiceNode(sn)
|
||||
e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.Topic = topicServiceHealthConnect
|
||||
// todo(streaming): make namespace-aware in enterprise
|
||||
payload := e.Payload.(EventPayloadCheckServiceNode)
|
||||
payload.key = serviceName.Name
|
||||
e.Payload = payload
|
||||
|
||||
events = append(events, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Duplicate any events that affected connect-enabled instances (proxies or
|
||||
// native apps) to the relevant Connect topic.
|
||||
connectEvents, err := serviceHealthToConnectEvents(tx, events...)
|
||||
|
|
|
@ -174,6 +174,9 @@ func evIndexes(idx, create, modify uint64) func(e *stream.Event) error {
|
|||
}
|
||||
|
||||
func TestServiceHealthEventsFromChanges(t *testing.T) {
|
||||
setupIndex := uint64(10)
|
||||
mutateIndex := uint64(100)
|
||||
|
||||
cases := []struct {
|
||||
Name string
|
||||
Setup func(s *Store, tx *txn) error
|
||||
|
@ -1051,6 +1054,48 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
|
|||
evServiceTermingGateway("srv2")),
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "terminating gateway config entry created after gateway exists",
|
||||
Setup: func(s *Store, tx *txn) error {
|
||||
return s.ensureRegistrationTxn(tx, tx.Index, false,
|
||||
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
|
||||
},
|
||||
Mutate: func(s *Store, tx *txn) error {
|
||||
configEntry := &structs.TerminatingGatewayConfigEntry{
|
||||
Kind: structs.TerminatingGateway,
|
||||
Name: "tgate1",
|
||||
Services: []structs.LinkedService{
|
||||
{
|
||||
Name: "srv1",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
},
|
||||
{
|
||||
Name: "srv2",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
},
|
||||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}
|
||||
return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta())
|
||||
},
|
||||
WantEvents: []stream.Event{
|
||||
testServiceHealthEvent(t,
|
||||
"tgate1",
|
||||
evConnectTopic,
|
||||
evServiceTermingGateway("srv1"),
|
||||
evServiceIndex(setupIndex)),
|
||||
testServiceHealthEvent(t,
|
||||
"tgate1",
|
||||
evConnectTopic,
|
||||
evServiceTermingGateway("srv2"),
|
||||
evServiceIndex(setupIndex)),
|
||||
},
|
||||
},
|
||||
// terminating gateway with 2 instances
|
||||
// changing config entry to add a linked service
|
||||
// changing config entry to remove a linked service
|
||||
// deleting a config entry
|
||||
// deregistering a service behind a terminating gateway (should send no term gateway events)
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
|
@ -1061,7 +1106,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
|
|||
if tc.Setup != nil {
|
||||
// Bypass the publish mechanism for this test or we get into odd
|
||||
// recursive stuff...
|
||||
setupTx := s.db.WriteTxn(10)
|
||||
setupTx := s.db.WriteTxn(setupIndex)
|
||||
require.NoError(t, tc.Setup(s, setupTx))
|
||||
// Commit the underlying transaction without using wrapped Commit so we
|
||||
// avoid the whole event publishing system for setup here. It _should_
|
||||
|
@ -1070,7 +1115,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
|
|||
setupTx.Txn.Commit()
|
||||
}
|
||||
|
||||
tx := s.db.WriteTxn(100)
|
||||
tx := s.db.WriteTxn(mutateIndex)
|
||||
require.NoError(t, tc.Mutate(s, tx))
|
||||
|
||||
// Note we call the func under test directly rather than publishChanges so
|
||||
|
@ -1120,6 +1165,23 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error {
|
|||
}
|
||||
}
|
||||
|
||||
func evServiceIndex(idx uint64) func(e *stream.Event) error {
|
||||
return func(e *stream.Event) error {
|
||||
payload := e.Payload.(EventPayloadCheckServiceNode)
|
||||
payload.Value.Node.CreateIndex = idx
|
||||
payload.Value.Node.ModifyIndex = idx
|
||||
payload.Value.Service.CreateIndex = idx
|
||||
payload.Value.Service.ModifyIndex = idx
|
||||
for _, check := range payload.Value.Checks {
|
||||
check.CreateIndex = idx
|
||||
check.ModifyIndex = idx
|
||||
}
|
||||
e.Payload = payload
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
|
||||
t.Helper()
|
||||
if diff := cmp.Diff(x, y, opts...); diff != "" {
|
||||
|
|
Loading…
Reference in New Issue