state: handle terminating gateway events properly in snapshot

Refactored out a function that can be used for both the snapshot and stream of events to translate
an event into an appropriate connect event.

Previously terminating gateway events would have used the wrong key in the snapshot, which would have
caused them to be filtered out later on.

Also removed an unused function, and some commented out code.
This commit is contained in:
Daniel Nephin 2021-02-26 19:39:05 -05:00
parent de3fba8ef3
commit 0d3bb68255
2 changed files with 96 additions and 66 deletions

View File

@ -71,21 +71,24 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
event := stream.Event{
Index: idx,
Topic: topic,
}
payload := EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
},
}
if connect && n.Service.Kind == structs.ServiceKindConnectProxy {
payload.overrideKey = n.Service.Proxy.DestinationServiceName
if !connect {
// append each event as a separate item so that they can be serialized
// separately, to prevent the encoding of one massive message.
buf.Append([]stream.Event{event})
continue
}
event.Payload = payload
// append each event as a separate item so that they can be serialized
// separately, to prevent the encoding of one massive message.
buf.Append([]stream.Event{event})
events, err := connectEventsByServiceKind(tx, event)
if err != nil {
return idx, err
}
buf.Append(events)
}
return idx, err
@ -413,43 +416,56 @@ func serviceHealthToConnectEvents(
// Skip non-health or any events already emitted to Connect topic
continue
}
node := getPayloadCheckServiceNode(event.Payload)
if node.Service == nil {
continue
connectEvents, err := connectEventsByServiceKind(tx, event)
if err != nil {
return nil, err
}
connectEvent := event
connectEvent.Topic = topicServiceHealthConnect
switch {
case node.Service.Connect.Native:
result = append(result, connectEvent)
case node.Service.Kind == structs.ServiceKindConnectProxy:
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = node.Service.Proxy.DestinationServiceName
connectEvent.Payload = payload
result = append(result, connectEvent)
case node.Service.Kind == structs.ServiceKindTerminatingGateway:
iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta)
if err != nil {
return nil, err
}
// similar to checkServiceNodesTxn -> serviceGatewayNodes
for obj := iter.Next(); obj != nil; obj = iter.Next() {
result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service))
}
default:
// All other cases are not relevant to the connect topic
}
result = append(result, connectEvents...)
}
return result, nil
}
func connectEventsByServiceKind(tx ReadTxn, origEvent stream.Event) ([]stream.Event, error) {
node := getPayloadCheckServiceNode(origEvent.Payload)
if node.Service == nil {
return nil, nil
}
event := origEvent // shallow copy the event
event.Topic = topicServiceHealthConnect
if node.Service.Connect.Native {
return []stream.Event{event}, nil
}
switch node.Service.Kind {
case structs.ServiceKindConnectProxy:
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = node.Service.Proxy.DestinationServiceName
event.Payload = payload
return []stream.Event{event}, nil
case structs.ServiceKindTerminatingGateway:
var result []stream.Event
iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta)
if err != nil {
return nil, err
}
// similar to checkServiceNodesTxn -> serviceGatewayNodes
for obj := iter.Next(); obj != nil; obj = iter.Next() {
result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service))
}
return result, nil
default:
// All other cases are not relevant to the connect topic
}
return nil, nil
}
func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event {
event.Topic = topicServiceHealthConnect
payload := event.Payload.(EventPayloadCheckServiceNode)

View File

@ -85,6 +85,23 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar))
require.NoError(t, err)
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "web",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
err = store.EnsureConfigEntry(counter.Next(), configEntry)
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "tgate1", regTerminatingGateway))
require.NoError(t, err)
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect)
buf := &snapshotAppender{}
req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect}
@ -95,10 +112,9 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
expected := [][]stream.Event{
{
testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error {
testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
ep.overrideKey = "web"
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 1
@ -113,10 +129,9 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
}),
},
{
testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error {
testServiceHealthEvent(t, "web", evConnectTopic, evNode2, evSidecar, func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
ep.overrideKey = "web"
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 4
@ -130,6 +145,26 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
return nil
}),
},
{
testServiceHealthEvent(t, "tgate1",
evConnectTopic,
evServiceTermingGateway("web"),
func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 1
csn.Node.ModifyIndex = 1
csn.Service.CreateIndex = 7
csn.Service.ModifyIndex = 7
csn.Checks[0].CreateIndex = 1
csn.Checks[0].ModifyIndex = 1
csn.Checks[1].CreateIndex = 7
csn.Checks[1].ModifyIndex = 7
return nil
}),
},
}
assertDeepEqual(t, expected, buf.events, cmpEvents)
}
@ -161,18 +196,6 @@ func newIndexCounter() *indexCounter {
var _ stream.SnapshotAppender = (*snapshotAppender)(nil)
func evIndexes(idx, create, modify uint64) func(e *stream.Event) error {
return func(e *stream.Event) error {
e.Index = idx
csn := getPayloadCheckServiceNode(e.Payload)
csn.Node.CreateIndex = create
csn.Node.ModifyIndex = modify
csn.Service.CreateIndex = create
csn.Service.ModifyIndex = modify
return nil
}
}
type serviceHealthTestCase struct {
Name string
Setup func(s *Store, tx *txn) error
@ -1594,15 +1617,6 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error {
csn.Service.Kind = structs.ServiceKindTerminatingGateway
csn.Service.Port = 22000
// Convert the check to point to the right ID now. This isn't totally
// realistic - sidecars should have alias checks etc but this is good enough
// to test this code path.
//if len(csn.Checks) >= 2 {
// csn.Checks[1].CheckID = types.CheckID("service:" + svc + "_terminating_gateway")
// csn.Checks[1].ServiceID = svc + "_terminating_gateway"
// csn.Checks[1].ServiceName = svc + "_terminating_gateway"
//}
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.overrideKey = name