state: Add a test for ServiceHealthSnapshot
This commit is contained in:
parent
88bbde56da
commit
e8beda4685
|
@ -2089,6 +2089,10 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
|
|||
// parseCheckServiceNodes is used to parse through a given set of services,
|
||||
// and query for an associated node and a set of checks. This is the inner
|
||||
// method used to return a rich set of results from a more simple query.
|
||||
//
|
||||
// TODO: idx parameter is not used except as a return value. Remove it.
|
||||
// TODO: err parameter is only used for early return. Remove it and check from the
|
||||
// caller.
|
||||
func parseCheckServiceNodes(
|
||||
tx ReadTxn, ws memdb.WatchSet, idx uint64,
|
||||
services structs.ServiceNodes,
|
||||
|
|
|
@ -47,8 +47,6 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
|
|||
|
||||
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
||||
// of stream.Events that describe the current state of a service health query.
|
||||
//
|
||||
// TODO: no tests for this yet
|
||||
func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
||||
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
|
||||
tx := db.ReadTxn()
|
||||
|
|
|
@ -8,15 +8,107 @@ import (
|
|||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
func TestServiceHealthSnapshot(t *testing.T) {
|
||||
store := NewStateStore(nil)
|
||||
|
||||
counter := newIndexCounter()
|
||||
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2))
|
||||
require.NoError(t, err)
|
||||
|
||||
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth)
|
||||
buf := &snapshotAppender{}
|
||||
req := stream.SubscribeRequest{Key: "web"}
|
||||
|
||||
idx, err := fn(req, buf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, counter.Last(), idx)
|
||||
|
||||
expected := [][]stream.Event{
|
||||
{
|
||||
testServiceHealthEvent(t, "web", func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Node.CreateIndex = 1
|
||||
csn.Node.ModifyIndex = 1
|
||||
csn.Service.CreateIndex = 2
|
||||
csn.Service.ModifyIndex = 2
|
||||
csn.Checks[0].CreateIndex = 1
|
||||
csn.Checks[0].ModifyIndex = 1
|
||||
csn.Checks[1].CreateIndex = 2
|
||||
csn.Checks[1].ModifyIndex = 2
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
testServiceHealthEvent(t, "web", evNode2, func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Node.CreateIndex = 3
|
||||
csn.Node.ModifyIndex = 3
|
||||
csn.Service.CreateIndex = 3
|
||||
csn.Service.ModifyIndex = 3
|
||||
for i := range csn.Checks {
|
||||
csn.Checks[i].CreateIndex = 3
|
||||
csn.Checks[i].ModifyIndex = 3
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, buf.events, cmpEvents)
|
||||
}
|
||||
|
||||
type snapshotAppender struct {
|
||||
events [][]stream.Event
|
||||
}
|
||||
|
||||
func (s *snapshotAppender) Append(events []stream.Event) {
|
||||
s.events = append(s.events, events)
|
||||
}
|
||||
|
||||
type indexCounter struct {
|
||||
value uint64
|
||||
}
|
||||
|
||||
func (c *indexCounter) Next() uint64 {
|
||||
c.value++
|
||||
return c.value
|
||||
}
|
||||
|
||||
func (c *indexCounter) Last() uint64 {
|
||||
return c.value
|
||||
}
|
||||
|
||||
func newIndexCounter() *indexCounter {
|
||||
return &indexCounter{}
|
||||
}
|
||||
|
||||
var _ stream.SnapshotAppender = (*snapshotAppender)(nil)
|
||||
|
||||
func evIndexes(idx, create, modify uint64) func(e *stream.Event) error {
|
||||
return func(e *stream.Event) error {
|
||||
e.Index = idx
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Node.CreateIndex = create
|
||||
csn.Node.ModifyIndex = modify
|
||||
csn.Service.CreateIndex = create
|
||||
csn.Service.ModifyIndex = modify
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceHealthEventsFromChanges(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
|
|
Loading…
Reference in New Issue