state: Add two more tests for connect events with terminating gateways

And expand one test case to cover more.

Co-Authored-By: Kyle Havlovitz <kylehav@gmail.com>
This commit is contained in:
Daniel Nephin 2021-01-20 15:10:48 -05:00
parent abab373b89
commit 15b0d5f62b
3 changed files with 135 additions and 33 deletions

View File

@ -900,18 +900,19 @@ func maxIndexAndWatchChsForServiceNodes(tx ReadTxn,
// compatible destination for the given service name. This will include // compatible destination for the given service name. This will include
// both proxies and native integrations. // both proxies and native integrations.
func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
return s.serviceNodes(ws, serviceName, true, entMeta) tx := s.db.ReadTxn()
defer tx.Abort()
return serviceNodesTxn(tx, ws, serviceName, true, entMeta)
} }
// ServiceNodes returns the nodes associated with a given service name. // ServiceNodes returns the nodes associated with a given service name.
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
return s.serviceNodes(ws, serviceName, false, entMeta) tx := s.db.ReadTxn()
defer tx.Abort()
return serviceNodesTxn(tx, ws, serviceName, false, entMeta)
} }
func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Function for lookup // Function for lookup
index := "service" index := "service"
if connect { if connect {

View File

@ -210,38 +210,29 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
} }
gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change} gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
if termGatewayChanges == nil { if termGatewayChanges == nil {
termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange) termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange)
} }
gatewayChanges, ok := termGatewayChanges[gs.Gateway] _, ok := termGatewayChanges[gs.Gateway]
if !ok { if !ok {
termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{} termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{}
} }
prevChange, ok := gatewayChanges[gs.Service] switch gsChange.changeType {
if !ok { case changeUpdate:
after := gsChange.change.After.(*structs.GatewayService)
if gsChange.change.Before.(*structs.GatewayService).IsSame(after) {
continue
}
termGatewayChanges[gs.Gateway][gs.Service] = gsChange termGatewayChanges[gs.Gateway][gs.Service] = gsChange
continue case changeDelete, changeCreate:
}
if changeTypeFromChange(change) == changeDelete {
termGatewayChanges[gs.Gateway][gs.Service] = gsChange termGatewayChanges[gs.Gateway][gs.Service] = gsChange
continue
}
prevGs := changeObject(prevChange.change).(*structs.GatewayService)
if !gs.IsSame(prevGs) {
gsChange.changeType = changeUpdate
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
} else {
delete(termGatewayChanges[gs.Gateway], gs.Service)
} }
} }
} }
//fmt.Printf("term gateway map: %v", termGatewayChanges)
// Now act on those marked nodes/services // Now act on those marked nodes/services
for node, changeType := range nodeChanges { for node, changeType := range nodeChanges {
if changeType == changeDelete { if changeType == changeDelete {
@ -304,7 +295,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
for serviceName, gsChange := range serviceChanges { for serviceName, gsChange := range serviceChanges {
gs := changeObject(gsChange.change).(*structs.GatewayService) gs := changeObject(gsChange.change).(*structs.GatewayService)
_, nodes, err := serviceGatewayNodes(tx, nil, serviceName.Name, gs.GatewayKind, &gatewayName.EnterpriseMeta) _, nodes, err := serviceNodesTxn(tx, nil, gs.Gateway.Name, false, &gatewayName.EnterpriseMeta)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1037,8 +1037,15 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta())
}, },
Mutate: func(s *Store, tx *txn) error { Mutate: func(s *Store, tx *txn) error {
return s.ensureRegistrationTxn(tx, tx.Index, false, if err := s.ensureRegistrationTxn(
testServiceRegistration(t, "tgate1", regTerminatingGateway), false) tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false,
); err != nil {
return err
}
return s.ensureRegistrationTxn(
tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false)
}, },
WantEvents: []stream.Event{ WantEvents: []stream.Event{
testServiceHealthEvent(t, testServiceHealthEvent(t,
@ -1052,6 +1059,20 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
"tgate1", "tgate1",
evConnectTopic, evConnectTopic,
evServiceTermingGateway("srv2")), evServiceTermingGateway("srv2")),
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1"),
evNode2),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evNode2),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evNode2),
}, },
}, },
{ {
@ -1091,9 +1112,100 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
evServiceIndex(setupIndex)), evServiceIndex(setupIndex)),
}, },
}, },
// terminating gateway with 2 instances {
// changing config entry to add a linked service Name: "change the terminating gateway config entry to add a linked service",
// changing config entry to remove a linked service Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta())
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta())
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evServiceIndex(setupIndex)),
},
},
{
Name: "change the terminating gateway config entry to remove a linked service",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta())
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta())
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
},
},
// change the terminating gateway config entry to update a linked service (new SNI/CAFile/etc)
// deleting a config entry // deleting a config entry
// deregistering a service behind a terminating gateway (should send no term gateway events) // deregistering a service behind a terminating gateway (should send no term gateway events)
} }
@ -1127,15 +1239,13 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
} }
require.NoError(t, err) require.NoError(t, err)
assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents) assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty())
}) })
} }
} }
func regTerminatingGateway(req *structs.RegisterRequest) error { func regTerminatingGateway(req *structs.RegisterRequest) error {
req.Service.Service = "tgate1"
req.Service.Kind = structs.ServiceKindTerminatingGateway req.Service.Kind = structs.ServiceKindTerminatingGateway
req.Service.ID = "tgate1"
req.Service.Port = 22000 req.Service.Port = 22000
return nil return nil
} }