Merge pull request #9671 from hashicorp/streaming/terminating-gateway-events

state: Add terminating gateway events for streaming
This commit is contained in:
Daniel Nephin 2021-03-09 14:20:21 -05:00 committed by GitHub
commit 848314782e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 892 additions and 96 deletions

View File

@ -900,18 +900,19 @@ func maxIndexAndWatchChsForServiceNodes(tx ReadTxn,
// compatible destination for the given service name. This will include
// both proxies and native integrations.
func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
return s.serviceNodes(ws, serviceName, true, entMeta)
tx := s.db.ReadTxn()
defer tx.Abort()
return serviceNodesTxn(tx, ws, serviceName, true, entMeta)
}
// ServiceNodes returns the nodes associated with a given service name.
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
return s.serviceNodes(ws, serviceName, false, entMeta)
tx := s.db.ReadTxn()
defer tx.Abort()
return serviceNodesTxn(tx, ws, serviceName, false, entMeta)
}
func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
// Function for lookup
index := "service"
if connect {

View File

@ -23,7 +23,8 @@ type EventPayloadCheckServiceNode struct {
// key is used to override the key used to filter the payload. It is set for
// events in the connect topic to specify the name of the underlying service
// when the change event is for a sidecar or gateway.
key string
overrideKey string
overrideNamespace string
}
func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool {
@ -40,11 +41,15 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
}
name := e.Value.Service.Service
if e.key != "" {
name = e.key
if e.overrideKey != "" {
name = e.overrideKey
}
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
if e.overrideNamespace != "" {
ns = e.overrideNamespace
}
return (key == "" || strings.EqualFold(key, name)) &&
(namespace == "" || strings.EqualFold(namespace, ns))
}
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
@ -66,21 +71,24 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
event := stream.Event{
Index: idx,
Topic: topic,
}
payload := EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
},
}
if connect && n.Service.Kind == structs.ServiceKindConnectProxy {
payload.key = n.Service.Proxy.DestinationServiceName
if !connect {
// append each event as a separate item so that they can be serialized
// separately, to prevent the encoding of one massive message.
buf.Append([]stream.Event{event})
continue
}
event.Payload = payload
// append each event as a separate item so that they can be serialized
// separately, to prevent the encoding of one massive message.
buf.Append([]stream.Event{event})
events, err := connectEventsByServiceKind(tx, event)
if err != nil {
return idx, err
}
buf.Append(events)
}
return idx, err
@ -123,6 +131,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
var nodeChanges map[string]changeType
var serviceChanges map[nodeServiceTuple]serviceChange
var termGatewayChanges map[structs.ServiceName]map[structs.ServiceName]serviceChange
markNode := func(node string, typ changeType) {
if nodeChanges == nil {
@ -201,6 +210,33 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect)
}
}
case tableGatewayServices:
gs := changeObject(change).(*structs.GatewayService)
if gs.GatewayKind != structs.ServiceKindTerminatingGateway {
continue
}
gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
if termGatewayChanges == nil {
termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange)
}
_, ok := termGatewayChanges[gs.Gateway]
if !ok {
termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{}
}
switch gsChange.changeType {
case changeUpdate:
after := gsChange.change.After.(*structs.GatewayService)
if gsChange.change.Before.(*structs.GatewayService).IsSame(after) {
continue
}
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
case changeDelete, changeCreate:
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
}
}
}
@ -221,9 +257,6 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
}
for tuple, srvChange := range serviceChanges {
// change may be nil if there was a change that _affected_ the service
// like a change to checks but it didn't actually change the service
// record itself.
if srvChange.changeType == changeDelete {
sn := srvChange.change.Before.(*structs.ServiceNode)
e := newServiceHealthEventDeregister(changes.Index, sn)
@ -265,9 +298,64 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
events = append(events, e)
}
for gatewayName, serviceChanges := range termGatewayChanges {
for serviceName, gsChange := range serviceChanges {
gs := changeObject(gsChange.change).(*structs.GatewayService)
_, nodes, err := serviceNodesTxn(tx, nil, gs.Gateway.Name, false, &gatewayName.EnterpriseMeta)
if err != nil {
return nil, err
}
// Always send deregister events for deletes/updates.
if gsChange.changeType != changeCreate {
for _, sn := range nodes {
e := newServiceHealthEventDeregister(changes.Index, sn)
e.Topic = topicServiceHealthConnect
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = serviceName.Name
if gatewayName.EnterpriseMeta.GetNamespace() != serviceName.EnterpriseMeta.GetNamespace() {
payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace()
}
e.Payload = payload
events = append(events, e)
}
}
if gsChange.changeType == changeDelete {
continue
}
// Build service events and append them
for _, sn := range nodes {
tuple := newNodeServiceTupleFromServiceNode(sn)
e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
if err != nil {
return nil, err
}
e.Topic = topicServiceHealthConnect
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = serviceName.Name
if gatewayName.EnterpriseMeta.GetNamespace() != serviceName.EnterpriseMeta.GetNamespace() {
payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace()
}
e.Payload = payload
events = append(events, e)
}
}
}
// Duplicate any events that affected connect-enabled instances (proxies or
// native apps) to the relevant Connect topic.
events = append(events, serviceHealthToConnectEvents(events...)...)
connectEvents, err := serviceHealthToConnectEvents(tx, events...)
if err != nil {
return nil, err
}
events = append(events, connectEvents...)
return events, nil
}
@ -285,7 +373,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S
e := newServiceHealthEventDeregister(idx, before)
e.Topic = topicServiceHealthConnect
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = payload.Value.Service.Proxy.DestinationServiceName
payload.overrideKey = payload.Value.Service.Proxy.DestinationServiceName
e.Payload = payload
return e, true
}
@ -318,38 +406,76 @@ func changeTypeFromChange(change memdb.Change) changeType {
// enabled and so of no interest to those subscribers but also involves
// switching connection details to be the proxy instead of the actual instance
// in case of a sidecar.
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
func serviceHealthToConnectEvents(
tx ReadTxn,
events ...stream.Event,
) ([]stream.Event, error) {
var result []stream.Event
for _, event := range events {
if event.Topic != topicServiceHealth {
if event.Topic != topicServiceHealth { // event.Topic == topicServiceHealthConnect
// Skip non-health or any events already emitted to Connect topic
continue
}
node := getPayloadCheckServiceNode(event.Payload)
if node.Service == nil {
continue
connectEvents, err := connectEventsByServiceKind(tx, event)
if err != nil {
return nil, err
}
connectEvent := event
connectEvent.Topic = topicServiceHealthConnect
switch {
case node.Service.Connect.Native:
result = append(result, connectEvent)
case node.Service.Kind == structs.ServiceKindConnectProxy:
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.key = node.Service.Proxy.DestinationServiceName
connectEvent.Payload = payload
result = append(result, connectEvent)
default:
// ServiceKindTerminatingGateway changes are handled separately.
// All other cases are not relevant to the connect topic
}
result = append(result, connectEvents...)
}
return result
return result, nil
}
func connectEventsByServiceKind(tx ReadTxn, origEvent stream.Event) ([]stream.Event, error) {
node := getPayloadCheckServiceNode(origEvent.Payload)
if node.Service == nil {
return nil, nil
}
event := origEvent // shallow copy the event
event.Topic = topicServiceHealthConnect
if node.Service.Connect.Native {
return []stream.Event{event}, nil
}
switch node.Service.Kind {
case structs.ServiceKindConnectProxy:
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = node.Service.Proxy.DestinationServiceName
event.Payload = payload
return []stream.Event{event}, nil
case structs.ServiceKindTerminatingGateway:
var result []stream.Event
iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta)
if err != nil {
return nil, err
}
// similar to checkServiceNodesTxn -> serviceGatewayNodes
for obj := iter.Next(); obj != nil; obj = iter.Next() {
result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service))
}
return result, nil
default:
// All other cases are not relevant to the connect topic
}
return nil, nil
}
func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event {
event.Topic = topicServiceHealthConnect
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = service.Name
if payload.Value.Service.EnterpriseMeta.GetNamespace() != service.EnterpriseMeta.GetNamespace() {
payload.overrideNamespace = service.EnterpriseMeta.GetNamespace()
}
event.Payload = payload
return event
}
func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode {

View File

@ -0,0 +1,7 @@
// +build !consulent
package state
func withServiceHealthEnterpriseCases(cases []serviceHealthTestCase) []serviceHealthTestCase {
return cases
}

View File

@ -85,6 +85,23 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar))
require.NoError(t, err)
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "web",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err = store.EnsureConfigEntry(counter.Next(), configEntry)
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "tgate1", regTerminatingGateway))
require.NoError(t, err)
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect)
buf := &snapshotAppender{}
req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect}
@ -95,10 +112,9 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
expected := [][]stream.Event{
{
testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error {
testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
ep.key = "web"
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 1
@ -113,10 +129,9 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
}),
},
{
testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error {
testServiceHealthEvent(t, "web", evConnectTopic, evNode2, evSidecar, func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
ep.key = "web"
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 4
@ -130,6 +145,26 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
return nil
}),
},
{
testServiceHealthEvent(t, "tgate1",
evConnectTopic,
evServiceTermingGateway("web"),
func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 1
csn.Node.ModifyIndex = 1
csn.Service.CreateIndex = 7
csn.Service.ModifyIndex = 7
csn.Checks[0].CreateIndex = 1
csn.Checks[0].ModifyIndex = 1
csn.Checks[1].CreateIndex = 7
csn.Checks[1].ModifyIndex = 7
return nil
}),
},
}
assertDeepEqual(t, expected, buf.events, cmpEvents)
}
@ -161,26 +196,19 @@ func newIndexCounter() *indexCounter {
var _ stream.SnapshotAppender = (*snapshotAppender)(nil)
func evIndexes(idx, create, modify uint64) func(e *stream.Event) error {
return func(e *stream.Event) error {
e.Index = idx
csn := getPayloadCheckServiceNode(e.Payload)
csn.Node.CreateIndex = create
csn.Node.ModifyIndex = modify
csn.Service.CreateIndex = create
csn.Service.ModifyIndex = modify
return nil
}
type serviceHealthTestCase struct {
Name string
Setup func(s *Store, tx *txn) error
Mutate func(s *Store, tx *txn) error
WantEvents []stream.Event
WantErr bool
}
func TestServiceHealthEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
Setup func(s *Store, tx *txn) error
Mutate func(s *Store, tx *txn) error
WantEvents []stream.Event
WantErr bool
}{
setupIndex := uint64(10)
mutateIndex := uint64(100)
cases := []serviceHealthTestCase{
{
Name: "irrelevant events",
Mutate: func(s *Store, tx *txn) error {
@ -480,7 +508,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
evRenameService,
evServiceMutated,
evNodeUnchanged,
evChecksMutated,
evServiceChecksMutated,
),
testServiceHealthDeregistrationEvent(t, "web",
evConnectTopic,
@ -794,14 +822,14 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
evServiceCheckFail,
evNodeUnchanged,
evServiceUnchanged,
evChecksMutated,
evServiceChecksMutated,
),
testServiceHealthEvent(t, "web",
evSidecar,
evServiceCheckFail,
evNodeUnchanged,
evServiceUnchanged,
evChecksMutated,
evServiceChecksMutated,
),
testServiceHealthEvent(t, "web",
evConnectTopic,
@ -809,7 +837,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
evServiceCheckFail,
evNodeUnchanged,
evServiceUnchanged,
evChecksMutated,
evServiceChecksMutated,
),
},
WantErr: false,
@ -1001,7 +1029,546 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged),
},
},
{
Name: "terminating gateway registered with no config entry",
Mutate: func(s *Store, tx *txn) error {
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1")),
},
},
{
Name: "config entry created with no terminating gateway instance",
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{},
},
{
Name: "terminating gateway registered after config entry exists",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
Mutate: func(s *Store, tx *txn) error {
if err := s.ensureRegistrationTxn(
tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false,
); err != nil {
return err
}
return s.ensureRegistrationTxn(
tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2")),
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1"),
evNode2),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evNode2),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evNode2),
},
},
{
Name: "terminating gateway updated after config entry exists",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(
tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
return s.ensureRegistrationTxn(
tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway, regNodeCheckFail), false)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1"),
evNodeCheckFail,
evNodeUnchanged,
evNodeChecksMutated,
evServiceUnchanged),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evNodeCheckFail,
evNodeUnchanged,
evNodeChecksMutated,
evServiceUnchanged),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evNodeCheckFail,
evNodeUnchanged,
evNodeChecksMutated,
evServiceUnchanged),
},
},
{
Name: "terminating gateway config entry created after gateway exists",
Setup: func(s *Store, tx *txn) error {
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evServiceIndex(setupIndex)),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evServiceIndex(setupIndex)),
},
},
{
Name: "change the terminating gateway config entry to add a linked service",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2"),
evServiceIndex(setupIndex)),
},
},
{
Name: "change the terminating gateway config entry to remove a linked service",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
},
},
{
Name: "update a linked service within a terminating gateway config entry",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
CAFile: "foo.crt",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry)
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evServiceIndex(setupIndex)),
},
},
{
Name: "delete a terminating gateway config entry with a linked service",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
err = s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
if err != nil {
return err
}
return s.ensureRegistrationTxn(
tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false)
},
Mutate: func(s *Store, tx *txn) error {
return deleteConfigEntryTxn(tx, tx.Index, structs.TerminatingGateway, "tgate1", structs.DefaultEnterpriseMeta())
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evNode2),
},
},
{
Name: "create an instance of a linked service in a terminating gateway",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t, "srv1", evNodeUnchanged),
},
},
{
Name: "delete an instance of a linked service in a terminating gateway",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
err = s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "srv1"), false)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
return s.deleteServiceTxn(tx, tx.Index, "node1", "srv1", nil)
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t, "srv1"),
},
},
{
Name: "rename a terminating gateway instance",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
configEntry = &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate2",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err = ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
rename := func(req *structs.RegisterRequest) error {
req.Service.Service = "tgate2"
req.Checks[1].ServiceName = "tgate2"
return nil
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway, rename), false)
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evServiceTermingGateway("tgate1")),
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway(""),
evNodeUnchanged,
evServiceMutated,
evServiceChecksMutated,
evTerminatingGatewayRenamed("tgate2")),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evNodeUnchanged,
evServiceMutated,
evServiceChecksMutated,
evTerminatingGatewayRenamed("tgate2")),
},
},
{
Name: "delete a terminating gateway instance",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
return s.deleteServiceTxn(tx, tx.Index, "node1", "tgate1", structs.DefaultEnterpriseMeta())
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evServiceTermingGateway("")),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2")),
},
},
}
cases = withServiceHealthEnterpriseCases(cases)
for _, tc := range cases {
tc := tc
@ -1011,7 +1578,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
if tc.Setup != nil {
// Bypass the publish mechanism for this test or we get into odd
// recursive stuff...
setupTx := s.db.WriteTxn(10)
setupTx := s.db.WriteTxn(setupIndex)
require.NoError(t, tc.Setup(s, setupTx))
// Commit the underlying transaction without using wrapped Commit so we
// avoid the whole event publishing system for setup here. It _should_
@ -1020,7 +1587,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
setupTx.Txn.Commit()
}
tx := s.db.WriteTxn(100)
tx := s.db.WriteTxn(mutateIndex)
require.NoError(t, tc.Mutate(s, tx))
// Note we call the func under test directly rather than publishChanges so
@ -1032,11 +1599,50 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
}
require.NoError(t, err)
assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents)
assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty())
})
}
}
func regTerminatingGateway(req *structs.RegisterRequest) error {
req.Service.Kind = structs.ServiceKindTerminatingGateway
req.Service.Port = 22000
return nil
}
func evServiceTermingGateway(name string) func(e *stream.Event) error {
return func(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
csn.Service.Kind = structs.ServiceKindTerminatingGateway
csn.Service.Port = 22000
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = name
e.Payload = payload
}
return nil
}
}
func evServiceIndex(idx uint64) func(e *stream.Event) error {
return func(e *stream.Event) error {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.Value.Node.CreateIndex = idx
payload.Value.Node.ModifyIndex = idx
payload.Value.Service.CreateIndex = idx
payload.Value.Service.ModifyIndex = idx
for _, check := range payload.Value.Checks {
check.CreateIndex = idx
check.ModifyIndex = idx
}
e.Payload = payload
return nil
}
}
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" {
@ -1045,13 +1651,26 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
}
// cmpPartialOrderEvents returns a compare option which sorts events so that
// all events for a particular node/service are grouped together. The sort is
// stable so events with the same node/service retain their relative order.
// all events for a particular topic are grouped together. The sort is
// stable so events with the same key retain their relative order.
//
// This sort should match the logic in EventPayloadCheckServiceNode.MatchesKey
// to avoid masking bugs.
var cmpPartialOrderEvents = cmp.Options{
cmpopts.SortSlices(func(i, j stream.Event) bool {
key := func(e stream.Event) string {
csn := getPayloadCheckServiceNode(e.Payload)
return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service)
payload := e.Payload.(EventPayloadCheckServiceNode)
csn := payload.Value
name := csn.Service.Service
if payload.overrideKey != "" {
name = payload.overrideKey
}
ns := csn.Service.EnterpriseMeta.GetNamespace()
if payload.overrideNamespace != "" {
ns = payload.overrideNamespace
}
return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, ns, name)
}
return key(i) < key(j)
}),
@ -1106,7 +1725,9 @@ func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *struc
})
for _, opt := range opts {
err := opt(r)
require.NoError(t, err)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
}
return r
}
@ -1124,8 +1745,9 @@ func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) strea
csn.Node.Address = "10.10.10.10"
for _, opt := range opts {
err := opt(&e)
require.NoError(t, err)
if err := opt(&e); err != nil {
t.Fatalf("expected no error, got %v", err)
}
}
return e
}
@ -1133,8 +1755,9 @@ func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) strea
func testServiceHealthDeregistrationEvent(t *testing.T, svc string, opts ...eventOption) stream.Event {
e := newTestEventServiceHealthDeregister(100, 1, svc)
for _, opt := range opts {
err := opt(&e)
require.NoError(t, err)
if err := opt(&e); err != nil {
t.Fatalf("expected no error, got %v", err)
}
}
return e
}
@ -1302,7 +1925,7 @@ func evConnectNative(e *stream.Event) error {
// evConnectTopic option converts the base event to the equivalent event that
// should be published to the connect topic. When needed it should be applied
// first as several other options (notable evSidecar) change behavior subtly
// depending on which topic they are published to and they determin this from
// depending on which topic they are published to and they determine this from
// the event.
func evConnectTopic(e *stream.Event) error {
e.Topic = topicServiceHealthConnect
@ -1339,7 +1962,7 @@ func evSidecar(e *stream.Event) error {
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = svc
payload.overrideKey = svc
e.Payload = payload
}
return nil
@ -1371,12 +1994,12 @@ func evServiceMutated(e *stream.Event) error {
return nil
}
// evChecksMutated option alters the base event service check to set it's
// evServiceChecksMutated option alters the base event service check to set it's
// CreateIndex (but not modify index) to the setup index. This expresses that we
// expect the service check records originally created in setup to have been
// mutated during the update. NOTE: this must be sequenced after
// evServiceUnchanged if both are used.
func evChecksMutated(e *stream.Event) error {
func evServiceChecksMutated(e *stream.Event) error {
getPayloadCheckServiceNode(e.Payload).Checks[1].CreateIndex = 10
getPayloadCheckServiceNode(e.Payload).Checks[1].ModifyIndex = 100
return nil
@ -1428,12 +2051,21 @@ func evRenameService(e *stream.Event) error {
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = csn.Service.Proxy.DestinationServiceName
payload.overrideKey = csn.Service.Proxy.DestinationServiceName
e.Payload = payload
}
return nil
}
func evTerminatingGatewayRenamed(newName string) func(e *stream.Event) error {
return func(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
csn.Service.Service = newName
csn.Checks[1].ServiceName = newName
return nil
}
}
// evNodeMeta option alters the base event node to add some meta data.
func evNodeMeta(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
@ -1669,14 +2301,42 @@ func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) {
},
{
name: "override key match",
payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv1"),
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""),
key: "srv1",
namespace: "ns1",
expected: true,
},
{
name: "override key match",
payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv2"),
name: "override key mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"),
key: "proxy",
namespace: "ns2",
expected: true,
},
{
name: "override namespace mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override both key and namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"),
key: "srv1",
namespace: "ns2",
expected: true,
},
{
name: "override both key and namespace mismatch namespace",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
@ -1701,7 +2361,8 @@ func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServ
}
}
func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayloadCheckServiceNode {
func newPayloadCheckServiceNodeWithOverride(
service, namespace, overrideKey, overrideNamespace string) EventPayloadCheckServiceNode {
return EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
@ -1709,6 +2370,7 @@ func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayl
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
},
},
key: key,
overrideKey: overrideKey,
overrideNamespace: overrideNamespace,
}
}