catalog_events: set the right key for connect snapshots
Add a test for catalog_event snapshot on connect topic
This commit is contained in:
parent
85da1af04c
commit
2cc3282d5d
|
@ -64,11 +64,17 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
|||
event := stream.Event{
|
||||
Index: idx,
|
||||
Topic: topic,
|
||||
Payload: EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &n,
|
||||
},
|
||||
}
|
||||
payload := EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &n,
|
||||
}
|
||||
|
||||
if connect && n.Service.Kind == structs.ServiceKindConnectProxy {
|
||||
payload.key = n.Service.Proxy.DestinationServiceName
|
||||
}
|
||||
|
||||
event.Payload = payload
|
||||
|
||||
// append each event as a separate item so that they can be serialized
|
||||
// separately, to prevent the encoding of one massive message.
|
||||
|
|
|
@ -70,6 +70,70 @@ func TestServiceHealthSnapshot(t *testing.T) {
|
|||
assertDeepEqual(t, expected, buf.events, cmpEvents)
|
||||
}
|
||||
|
||||
func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
|
||||
store := NewStateStore(nil)
|
||||
|
||||
counter := newIndexCounter()
|
||||
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regSidecar))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar))
|
||||
require.NoError(t, err)
|
||||
|
||||
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect)
|
||||
buf := &snapshotAppender{}
|
||||
req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect}
|
||||
|
||||
idx, err := fn(req, buf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, counter.Last(), idx)
|
||||
|
||||
expected := [][]stream.Event{
|
||||
{
|
||||
testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
ep := e.Payload.(EventPayloadCheckServiceNode)
|
||||
ep.key = "web"
|
||||
e.Payload = ep
|
||||
csn := ep.Value
|
||||
csn.Node.CreateIndex = 1
|
||||
csn.Node.ModifyIndex = 1
|
||||
csn.Service.CreateIndex = 3
|
||||
csn.Service.ModifyIndex = 3
|
||||
csn.Checks[0].CreateIndex = 1
|
||||
csn.Checks[0].ModifyIndex = 1
|
||||
csn.Checks[1].CreateIndex = 3
|
||||
csn.Checks[1].ModifyIndex = 3
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
ep := e.Payload.(EventPayloadCheckServiceNode)
|
||||
ep.key = "web"
|
||||
e.Payload = ep
|
||||
csn := ep.Value
|
||||
csn.Node.CreateIndex = 4
|
||||
csn.Node.ModifyIndex = 4
|
||||
csn.Service.CreateIndex = 5
|
||||
csn.Service.ModifyIndex = 5
|
||||
csn.Checks[0].CreateIndex = 4
|
||||
csn.Checks[0].ModifyIndex = 4
|
||||
csn.Checks[1].CreateIndex = 5
|
||||
csn.Checks[1].ModifyIndex = 5
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, buf.events, cmpEvents)
|
||||
}
|
||||
|
||||
type snapshotAppender struct {
|
||||
events [][]stream.Event
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue