diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 52714468f..71a31d272 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2089,6 +2089,10 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru // parseCheckServiceNodes is used to parse through a given set of services, // and query for an associated node and a set of checks. This is the inner // method used to return a rich set of results from a more simple query. +// +// TODO: idx parameter is not used except as a return value. Remove it. +// TODO: err parameter is only used for early return. Remove it and check from the +// caller. func parseCheckServiceNodes( tx ReadTxn, ws memdb.WatchSet, idx uint64, services structs.ServiceNodes, diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index d18b72070..e88330c9c 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -47,8 +47,6 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool { // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot // of stream.Events that describe the current state of a service health query. -// -// TODO: no tests for this yet func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { tx := db.ReadTxn() diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index f7fa21df0..1d1c0f2f3 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -8,15 +8,107 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/proto/pbcommon" - "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" ) +func TestServiceHealthSnapshot(t *testing.T) { + store := NewStateStore(nil) + + counter := newIndexCounter() + err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2)) + require.NoError(t, err) + + fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth) + buf := &snapshotAppender{} + req := stream.SubscribeRequest{Key: "web"} + + idx, err := fn(req, buf) + require.NoError(t, err) + require.Equal(t, counter.Last(), idx) + + expected := [][]stream.Event{ + { + testServiceHealthEvent(t, "web", func(e *stream.Event) error { + e.Index = counter.Last() + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.CreateIndex = 1 + csn.Node.ModifyIndex = 1 + csn.Service.CreateIndex = 2 + csn.Service.ModifyIndex = 2 + csn.Checks[0].CreateIndex = 1 + csn.Checks[0].ModifyIndex = 1 + csn.Checks[1].CreateIndex = 2 + csn.Checks[1].ModifyIndex = 2 + return nil + }), + }, + { + testServiceHealthEvent(t, "web", evNode2, func(e *stream.Event) error { + e.Index = counter.Last() + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.CreateIndex = 3 + csn.Node.ModifyIndex = 3 + csn.Service.CreateIndex = 3 + csn.Service.ModifyIndex = 3 + for i := range csn.Checks { + csn.Checks[i].CreateIndex = 3 + csn.Checks[i].ModifyIndex = 3 + } + return nil + }), + }, + } + assertDeepEqual(t, expected, buf.events, cmpEvents) +} + +type snapshotAppender struct { + events [][]stream.Event +} + +func (s *snapshotAppender) Append(events []stream.Event) { + s.events = append(s.events, events) +} + +type indexCounter struct { + value uint64 +} + +func (c *indexCounter) Next() uint64 { + c.value++ + return c.value +} + +func (c *indexCounter) Last() uint64 { + return c.value +} + +func newIndexCounter() *indexCounter { + return &indexCounter{} +} + +var _ stream.SnapshotAppender = (*snapshotAppender)(nil) + +func evIndexes(idx, create, modify uint64) func(e *stream.Event) error { + return func(e *stream.Event) error { + e.Index = idx + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.CreateIndex = create + csn.Node.ModifyIndex = modify + csn.Service.CreateIndex = create + csn.Service.ModifyIndex = modify + return nil + } +} + func TestServiceHealthEventsFromChanges(t *testing.T) { cases := []struct { Name string