rpcclient/health: convert tests to the new submatview.Store interface
Also fixes a minor data race in Materializer. Capture the error before releasing the lock.
This commit is contained in:
parent
1a6bff1109
commit
2dfacb2d49
|
@ -0,0 +1,69 @@
|
||||||
|
package health
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// streamClient is a mock StreamingClient for testing that allows
|
||||||
|
// for queueing up custom events to a subscriber.
|
||||||
|
type streamClient struct {
|
||||||
|
pbsubscribe.StateChangeSubscription_SubscribeClient
|
||||||
|
subFn func(*pbsubscribe.SubscribeRequest) error
|
||||||
|
events chan eventOrErr
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventOrErr struct {
|
||||||
|
Err error
|
||||||
|
Event *pbsubscribe.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStreamClient(sub func(req *pbsubscribe.SubscribeRequest) error) *streamClient {
|
||||||
|
if sub == nil {
|
||||||
|
sub = func(*pbsubscribe.SubscribeRequest) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &streamClient{
|
||||||
|
events: make(chan eventOrErr, 32),
|
||||||
|
subFn: sub,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *streamClient) Subscribe(
|
||||||
|
ctx context.Context,
|
||||||
|
req *pbsubscribe.SubscribeRequest,
|
||||||
|
_ ...grpc.CallOption,
|
||||||
|
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
|
||||||
|
if err := t.subFn(req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
t.ctx = ctx
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *streamClient) QueueEvents(events ...*pbsubscribe.Event) {
|
||||||
|
for _, e := range events {
|
||||||
|
t.events <- eventOrErr{Event: e}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *streamClient) QueueErr(err error) {
|
||||||
|
t.events <- eventOrErr{Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *streamClient) Recv() (*pbsubscribe.Event, error) {
|
||||||
|
select {
|
||||||
|
case eoe := <-t.events:
|
||||||
|
if eoe.Err != nil {
|
||||||
|
return nil, eoe.Err
|
||||||
|
}
|
||||||
|
return eoe.Event, nil
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
return nil, t.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,248 +1,29 @@
|
||||||
package health
|
package health
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"context"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
)
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/submatview"
|
||||||
"github.com/hashicorp/consul/proto/pbcommon"
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) {
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace := pbcommon.DefaultEnterpriseMeta.Namespace
|
|
||||||
client := NewTestStreamingClient(namespace)
|
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
|
||||||
Client: client,
|
|
||||||
Logger: hclog.Default(),
|
|
||||||
}}
|
|
||||||
|
|
||||||
// Initially there are no services registered. Server should send an
|
|
||||||
// EndOfSnapshot message immediately with index of 1.
|
|
||||||
client.QueueEvents(newEndOfSnapshotEvent(1))
|
|
||||||
|
|
||||||
opts := cache.FetchOptions{
|
|
||||||
MinIndex: 0,
|
|
||||||
Timeout: time.Second,
|
|
||||||
}
|
|
||||||
req := &structs.ServiceSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
ServiceName: "web",
|
|
||||||
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
|
||||||
}
|
|
||||||
empty := &structs.IndexedCheckServiceNodes{
|
|
||||||
Nodes: structs.CheckServiceNodes{},
|
|
||||||
QueryMeta: structs.QueryMeta{
|
|
||||||
Index: 1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
runStep(t, "empty snapshot returned", func(t *testing.T) {
|
|
||||||
// Fetch should return an empty
|
|
||||||
// result of the right type with a non-zero index, and in the background begin
|
|
||||||
// streaming updates.
|
|
||||||
result, err := typ.Fetch(opts, req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Equal(t, uint64(1), result.Index)
|
|
||||||
require.Equal(t, empty, result.Value)
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
|
||||||
|
|
||||||
runStep(t, "blocks for timeout", func(t *testing.T) {
|
|
||||||
// Subsequent fetch should block for the timeout
|
|
||||||
start := time.Now()
|
|
||||||
opts.Timeout = 200 * time.Millisecond
|
|
||||||
result, err := typ.Fetch(opts, req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
require.True(t, elapsed >= 200*time.Millisecond,
|
|
||||||
"Fetch should have blocked until timeout")
|
|
||||||
|
|
||||||
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
|
||||||
require.Equal(t, empty, result.Value, "result value should not have changed")
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
|
||||||
|
|
||||||
runStep(t, "blocks until update", func(t *testing.T) {
|
|
||||||
// Make another blocking query with a longer timeout and trigger an update
|
|
||||||
// event part way through.
|
|
||||||
start := time.Now()
|
|
||||||
go func() {
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
client.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
|
|
||||||
}()
|
|
||||||
|
|
||||||
opts.Timeout = time.Second
|
|
||||||
result, err := typ.Fetch(opts, req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
require.True(t, elapsed >= 200*time.Millisecond,
|
|
||||||
"Fetch should have blocked until the event was delivered")
|
|
||||||
require.True(t, elapsed < time.Second,
|
|
||||||
"Fetch should have returned before the timeout")
|
|
||||||
|
|
||||||
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
|
|
||||||
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1,
|
|
||||||
"result value should contain the new registration")
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
|
||||||
|
|
||||||
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
|
|
||||||
client.QueueErr(tempError("broken pipe"))
|
|
||||||
|
|
||||||
// Next fetch will continue to block until timeout and receive the same
|
|
||||||
// result.
|
|
||||||
start := time.Now()
|
|
||||||
opts.Timeout = 200 * time.Millisecond
|
|
||||||
result, err := typ.Fetch(opts, req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
require.True(t, elapsed >= 200*time.Millisecond,
|
|
||||||
"Fetch should have blocked until timeout")
|
|
||||||
|
|
||||||
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
|
||||||
require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed")
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
|
||||||
opts.LastResult = &result
|
|
||||||
|
|
||||||
// But an update should still be noticed due to reconnection
|
|
||||||
client.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
|
|
||||||
|
|
||||||
start = time.Now()
|
|
||||||
opts.Timeout = time.Second
|
|
||||||
result, err = typ.Fetch(opts, req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
elapsed = time.Since(start)
|
|
||||||
require.True(t, elapsed < time.Second,
|
|
||||||
"Fetch should have returned before the timeout")
|
|
||||||
|
|
||||||
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
|
|
||||||
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2,
|
|
||||||
"result value should contain the new registration")
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
|
||||||
|
|
||||||
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
|
|
||||||
// Wait and send the error while fetcher is waiting
|
|
||||||
go func() {
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
client.QueueErr(errors.New("invalid request"))
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Next fetch should return the error
|
|
||||||
start := time.Now()
|
|
||||||
opts.Timeout = time.Second
|
|
||||||
result, err := typ.Fetch(opts, req)
|
|
||||||
require.Error(t, err)
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
require.True(t, elapsed >= 200*time.Millisecond,
|
|
||||||
"Fetch should have blocked until error was sent")
|
|
||||||
require.True(t, elapsed < time.Second,
|
|
||||||
"Fetch should have returned before the timeout")
|
|
||||||
|
|
||||||
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
|
||||||
// We don't require instances to be returned in same order so we use
|
|
||||||
// elementsMatch which is recursive.
|
|
||||||
requireResultsSame(t,
|
|
||||||
opts.LastResult.Value.(*structs.IndexedCheckServiceNodes),
|
|
||||||
result.Value.(*structs.IndexedCheckServiceNodes),
|
|
||||||
)
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
|
||||||
opts.LastResult = &result
|
|
||||||
|
|
||||||
// But an update should still be noticed due to reconnection
|
|
||||||
client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web"))
|
|
||||||
|
|
||||||
opts.Timeout = time.Second
|
|
||||||
result, err = typ.Fetch(opts, req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
elapsed = time.Since(start)
|
|
||||||
require.True(t, elapsed < time.Second,
|
|
||||||
"Fetch should have returned before the timeout")
|
|
||||||
|
|
||||||
require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed")
|
|
||||||
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
|
|
||||||
"result value should contain the new registration")
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type tempError string
|
|
||||||
|
|
||||||
func (e tempError) Error() string {
|
|
||||||
return string(e)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e tempError) Temporary() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// requireResultsSame compares two IndexedCheckServiceNodes without requiring
|
|
||||||
// the same order of results (which vary due to map usage internally).
|
|
||||||
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {
|
|
||||||
require.Equal(t, want.Index, got.Index)
|
|
||||||
|
|
||||||
svcIDs := func(csns structs.CheckServiceNodes) []string {
|
|
||||||
res := make([]string, 0, len(csns))
|
|
||||||
for _, csn := range csns {
|
|
||||||
res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID))
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
gotIDs := svcIDs(got.Nodes)
|
|
||||||
wantIDs := svcIDs(want.Nodes)
|
|
||||||
|
|
||||||
require.ElementsMatch(t, wantIDs, gotIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getNamespace returns a namespace if namespace support exists, otherwise
|
|
||||||
// returns the empty string. It allows the same tests to work in both oss and ent
|
|
||||||
// without duplicating the tests.
|
|
||||||
func getNamespace(ns string) string {
|
|
||||||
meta := structs.NewEnterpriseMeta(ns)
|
|
||||||
return meta.NamespaceOrEmpty()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOrderingConsistentWithMemDb(t *testing.T) {
|
|
||||||
index := uint64(42)
|
index := uint64(42)
|
||||||
buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
|
buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
|
||||||
newID, err := uuid.GenerateUUID()
|
newID, err := uuid.GenerateUUID()
|
||||||
|
@ -280,29 +61,205 @@ func TestOrderingConsistentWithMemDb(t *testing.T) {
|
||||||
two := buildTestNode("node1", "testService:2")
|
two := buildTestNode("node1", "testService:2")
|
||||||
three := buildTestNode("node2", "testService")
|
three := buildTestNode("node2", "testService")
|
||||||
result := structs.IndexedCheckServiceNodes{
|
result := structs.IndexedCheckServiceNodes{
|
||||||
Nodes: structs.CheckServiceNodes{
|
Nodes: structs.CheckServiceNodes{three, two, zero, one},
|
||||||
three, two, zero, one,
|
QueryMeta: structs.QueryMeta{Index: index},
|
||||||
},
|
|
||||||
QueryMeta: structs.QueryMeta{
|
|
||||||
Index: index,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
sortCheckServiceNodes(&result)
|
sortCheckServiceNodes(&result)
|
||||||
expected := structs.CheckServiceNodes{zero, one, two, three}
|
expected := structs.CheckServiceNodes{zero, one, two, three}
|
||||||
require.Equal(t, expected, result.Nodes)
|
require.Equal(t, expected, result.Nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace)
|
||||||
|
streamClient := newStreamClient(validateNamespace(namespace))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
store := submatview.NewStore(hclog.New(nil))
|
||||||
|
go store.Run(ctx)
|
||||||
|
|
||||||
|
// Initially there are no services registered. Server should send an
|
||||||
|
// EndOfSnapshot message immediately with index of 1.
|
||||||
|
streamClient.QueueEvents(newEndOfSnapshotEvent(1))
|
||||||
|
|
||||||
|
req := serviceRequestStub{
|
||||||
|
serviceRequest: serviceRequest{
|
||||||
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
||||||
|
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
streamClient: streamClient,
|
||||||
|
}
|
||||||
|
empty := &structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: structs.CheckServiceNodes{},
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Index: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
runStep(t, "empty snapshot returned", func(t *testing.T) {
|
||||||
|
result, err := store.Get(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(1), result.Index)
|
||||||
|
require.Equal(t, empty, result.Value)
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "blocks for timeout", func(t *testing.T) {
|
||||||
|
// Subsequent fetch should block for the timeout
|
||||||
|
start := time.Now()
|
||||||
|
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
|
||||||
|
result, err := store.Get(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until timeout")
|
||||||
|
|
||||||
|
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
|
||||||
|
require.Equal(t, empty, result.Value, "result value should not have changed")
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
|
})
|
||||||
|
|
||||||
|
var lastResultValue structs.CheckServiceNodes
|
||||||
|
|
||||||
|
runStep(t, "blocks until update", func(t *testing.T) {
|
||||||
|
// Make another blocking query with a longer timeout and trigger an update
|
||||||
|
// event part way through.
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
|
result, err := store.Get(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until the event was delivered")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
|
||||||
|
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
|
||||||
|
require.Len(t, lastResultValue, 1,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
|
||||||
|
streamClient.QueueErr(tempError("broken pipe"))
|
||||||
|
|
||||||
|
// Next fetch will continue to block until timeout and receive the same
|
||||||
|
// result.
|
||||||
|
start := time.Now()
|
||||||
|
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
|
||||||
|
result, err := store.Get(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until timeout")
|
||||||
|
|
||||||
|
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index,
|
||||||
|
"result index should not have changed")
|
||||||
|
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes,
|
||||||
|
"result value should not have changed")
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
|
|
||||||
|
// But an update should still be noticed due to reconnection
|
||||||
|
streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
|
result, err = store.Get(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
|
||||||
|
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
|
||||||
|
require.Len(t, lastResultValue, 2,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
|
||||||
|
// Wait and send the error while fetcher is waiting
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
streamClient.QueueErr(errors.New("invalid request"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Next fetch should return the error
|
||||||
|
start := time.Now()
|
||||||
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
|
result, err := store.Get(ctx, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until error was sent")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
|
||||||
|
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes)
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
|
|
||||||
|
// But an update should still be noticed due to reconnection
|
||||||
|
streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web"))
|
||||||
|
|
||||||
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
|
result, err = store.Get(ctx, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed")
|
||||||
|
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type tempError string
|
||||||
|
|
||||||
|
func (e tempError) Error() string {
|
||||||
|
return string(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e tempError) Temporary() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace := getNamespace("ns2")
|
namespace := getNamespace("ns2")
|
||||||
client := NewTestStreamingClient(namespace)
|
client := newStreamClient(validateNamespace(namespace))
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
|
||||||
Client: client,
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
Logger: hclog.Default(),
|
defer cancel()
|
||||||
}}
|
|
||||||
|
store := submatview.NewStore(hclog.New(nil))
|
||||||
|
|
||||||
// Create an initial snapshot of 3 instances on different nodes
|
// Create an initial snapshot of 3 instances on different nodes
|
||||||
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
|
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
|
||||||
|
@ -314,37 +271,28 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
registerServiceWeb(5, 3),
|
registerServiceWeb(5, 3),
|
||||||
newEndOfSnapshotEvent(5))
|
newEndOfSnapshotEvent(5))
|
||||||
|
|
||||||
// This contains the view state so important we share it between calls.
|
req := serviceRequestStub{
|
||||||
opts := cache.FetchOptions{
|
serviceRequest: serviceRequest{
|
||||||
MinIndex: 0,
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{
|
||||||
Timeout: 1 * time.Second,
|
|
||||||
}
|
|
||||||
req := &structs.ServiceSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
ServiceName: "web",
|
ServiceName: "web",
|
||||||
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
||||||
}
|
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
|
||||||
|
},
|
||||||
gatherNodes := func(res interface{}) []string {
|
},
|
||||||
nodes := make([]string, 0, 3)
|
streamClient: client,
|
||||||
r := res.(*structs.IndexedCheckServiceNodes)
|
|
||||||
for _, csn := range r.Nodes {
|
|
||||||
nodes = append(nodes, csn.Node.Node)
|
|
||||||
}
|
|
||||||
// Result will be sorted alphabetically the same way as memdb
|
|
||||||
return nodes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runStep(t, "full snapshot returned", func(t *testing.T) {
|
runStep(t, "full snapshot returned", func(t *testing.T) {
|
||||||
result, err := typ.Fetch(opts, req)
|
result, err := store.Get(ctx, req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(5), result.Index)
|
require.Equal(t, uint64(5), result.Index)
|
||||||
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
|
expected := newExpectedNodes("node1", "node2", "node3")
|
||||||
gatherNodes(result.Value))
|
expected.Index = 5
|
||||||
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
})
|
||||||
|
|
||||||
runStep(t, "blocks until deregistration", func(t *testing.T) {
|
runStep(t, "blocks until deregistration", func(t *testing.T) {
|
||||||
|
@ -358,8 +306,8 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
|
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
opts.Timeout = time.Second
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
result, err := typ.Fetch(opts, req)
|
result, err := store.Get(ctx, req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
require.True(t, elapsed >= 200*time.Millisecond,
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
@ -368,10 +316,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
"Fetch should have returned before the timeout")
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
require.Equal(t, uint64(20), result.Index)
|
require.Equal(t, uint64(20), result.Index)
|
||||||
require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value))
|
expected := newExpectedNodes("node2", "node3")
|
||||||
|
expected.Index = 20
|
||||||
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
})
|
||||||
|
|
||||||
runStep(t, "server reload is respected", func(t *testing.T) {
|
runStep(t, "server reload is respected", func(t *testing.T) {
|
||||||
|
@ -389,18 +338,19 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
// Make another blocking query with THE SAME index. It should immediately
|
// Make another blocking query with THE SAME index. It should immediately
|
||||||
// return the new snapshot.
|
// return the new snapshot.
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
opts.Timeout = time.Second
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
result, err := typ.Fetch(opts, req)
|
result, err := store.Get(ctx, req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
require.True(t, elapsed < time.Second,
|
require.True(t, elapsed < time.Second,
|
||||||
"Fetch should have returned before the timeout")
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
require.Equal(t, uint64(50), result.Index)
|
require.Equal(t, uint64(50), result.Index)
|
||||||
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
|
expected := newExpectedNodes("node3", "node4", "node5")
|
||||||
|
expected.Index = 50
|
||||||
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
})
|
||||||
|
|
||||||
runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
|
runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
|
||||||
|
@ -414,26 +364,54 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
newEndOfSnapshotEvent(50))
|
newEndOfSnapshotEvent(50))
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
opts.MinIndex = 49
|
req.QueryOptions.MinQueryIndex = 49
|
||||||
opts.Timeout = time.Second
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
result, err := typ.Fetch(opts, req)
|
result, err := store.Get(ctx, req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
require.True(t, elapsed < time.Second,
|
require.True(t, elapsed < time.Second,
|
||||||
"Fetch should have returned before the timeout")
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
require.Equal(t, uint64(50), result.Index)
|
require.Equal(t, uint64(50), result.Index)
|
||||||
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
|
expected := newExpectedNodes("node3", "node4", "node5")
|
||||||
|
expected.Index = 50
|
||||||
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
|
||||||
|
result := &structs.IndexedCheckServiceNodes{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
|
||||||
|
Node: &structs.Node{Node: node},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode
|
||||||
|
// by Node name.
|
||||||
|
var cmpCheckServiceNodeNames = cmp.Options{
|
||||||
|
cmp.Comparer(func(x, y structs.CheckServiceNode) bool {
|
||||||
|
return x.Node.Node == y.Node.Node
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
|
||||||
|
t.Helper()
|
||||||
|
if diff := cmp.Diff(x, y, opts...); diff != "" {
|
||||||
|
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
|
||||||
namespace := getNamespace("ns3")
|
namespace := getNamespace("ns3")
|
||||||
client := NewTestStreamingClient(namespace)
|
client := newStreamClient(validateNamespace(namespace))
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
|
||||||
Client: client,
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
Logger: hclog.Default(),
|
defer cancel()
|
||||||
}}
|
|
||||||
|
store := submatview.NewStore(hclog.New(nil))
|
||||||
|
|
||||||
// Create an initial snapshot of 3 instances but in a single event batch
|
// Create an initial snapshot of 3 instances but in a single event batch
|
||||||
batchEv := newEventBatchWithEvents(
|
batchEv := newEventBatchWithEvents(
|
||||||
|
@ -444,36 +422,28 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
||||||
batchEv,
|
batchEv,
|
||||||
newEndOfSnapshotEvent(5))
|
newEndOfSnapshotEvent(5))
|
||||||
|
|
||||||
// This contains the view state so important we share it between calls.
|
req := serviceRequestStub{
|
||||||
opts := cache.FetchOptions{
|
serviceRequest: serviceRequest{
|
||||||
MinIndex: 0,
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{
|
||||||
Timeout: 1 * time.Second,
|
|
||||||
}
|
|
||||||
req := &structs.ServiceSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
ServiceName: "web",
|
ServiceName: "web",
|
||||||
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
||||||
}
|
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
|
||||||
|
},
|
||||||
gatherNodes := func(res interface{}) []string {
|
},
|
||||||
nodes := make([]string, 0, 3)
|
streamClient: client,
|
||||||
r := res.(*structs.IndexedCheckServiceNodes)
|
|
||||||
for _, csn := range r.Nodes {
|
|
||||||
nodes = append(nodes, csn.Node.Node)
|
|
||||||
}
|
|
||||||
return nodes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runStep(t, "full snapshot returned", func(t *testing.T) {
|
runStep(t, "full snapshot returned", func(t *testing.T) {
|
||||||
result, err := typ.Fetch(opts, req)
|
result, err := store.Get(ctx, req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(5), result.Index)
|
require.Equal(t, uint64(5), result.Index)
|
||||||
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
|
|
||||||
gatherNodes(result.Value))
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
expected := newExpectedNodes("node1", "node2", "node3")
|
||||||
opts.LastResult = &result
|
expected.Index = 5
|
||||||
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
})
|
})
|
||||||
|
|
||||||
runStep(t, "batched updates work too", func(t *testing.T) {
|
runStep(t, "batched updates work too", func(t *testing.T) {
|
||||||
|
@ -486,99 +456,226 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
||||||
newEventServiceHealthRegister(20, 4, "web"),
|
newEventServiceHealthRegister(20, 4, "web"),
|
||||||
)
|
)
|
||||||
client.QueueEvents(batchEv)
|
client.QueueEvents(batchEv)
|
||||||
opts.Timeout = time.Second
|
req.QueryOptions.MaxQueryTime = time.Second
|
||||||
result, err := typ.Fetch(opts, req)
|
result, err := store.Get(ctx, req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(20), result.Index)
|
require.Equal(t, uint64(20), result.Index)
|
||||||
require.ElementsMatch(t, []string{"node2", "node3", "node4"},
|
expected := newExpectedNodes("node2", "node3", "node4")
|
||||||
gatherNodes(result.Value))
|
expected.Index = 20
|
||||||
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStreamingHealthServices_Filtering(t *testing.T) {
|
func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
|
||||||
namespace := getNamespace("ns3")
|
namespace := getNamespace("ns3")
|
||||||
client := NewTestStreamingClient(namespace)
|
streamClient := newStreamClient(validateNamespace(namespace))
|
||||||
typ := StreamingHealthServices{deps: MaterializerDeps{
|
|
||||||
Client: client,
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
Logger: hclog.Default(),
|
defer cancel()
|
||||||
}}
|
|
||||||
|
store := submatview.NewStore(hclog.New(nil))
|
||||||
|
go store.Run(ctx)
|
||||||
|
|
||||||
|
req := serviceRequestStub{
|
||||||
|
serviceRequest: serviceRequest{
|
||||||
|
ServiceSpecificRequest: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Filter: `Node.Node == "node2"`,
|
||||||
|
MaxQueryTime: time.Second,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
streamClient: streamClient,
|
||||||
|
}
|
||||||
|
|
||||||
// Create an initial snapshot of 3 instances but in a single event batch
|
// Create an initial snapshot of 3 instances but in a single event batch
|
||||||
batchEv := newEventBatchWithEvents(
|
batchEv := newEventBatchWithEvents(
|
||||||
newEventServiceHealthRegister(5, 1, "web"),
|
newEventServiceHealthRegister(5, 1, "web"),
|
||||||
newEventServiceHealthRegister(5, 2, "web"),
|
newEventServiceHealthRegister(5, 2, "web"),
|
||||||
newEventServiceHealthRegister(5, 3, "web"))
|
newEventServiceHealthRegister(5, 3, "web"))
|
||||||
client.QueueEvents(
|
streamClient.QueueEvents(
|
||||||
batchEv,
|
batchEv,
|
||||||
newEndOfSnapshotEvent(5))
|
newEndOfSnapshotEvent(5))
|
||||||
|
|
||||||
// This contains the view state so important we share it between calls.
|
|
||||||
opts := cache.FetchOptions{
|
|
||||||
MinIndex: 0,
|
|
||||||
Timeout: 1 * time.Second,
|
|
||||||
}
|
|
||||||
req := &structs.ServiceSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
ServiceName: "web",
|
|
||||||
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
|
|
||||||
QueryOptions: structs.QueryOptions{
|
|
||||||
Filter: `Node.Node == "node2"`,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
gatherNodes := func(res interface{}) []string {
|
|
||||||
nodes := make([]string, 0, 3)
|
|
||||||
r := res.(*structs.IndexedCheckServiceNodes)
|
|
||||||
for _, csn := range r.Nodes {
|
|
||||||
nodes = append(nodes, csn.Node.Node)
|
|
||||||
}
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
runStep(t, "filtered snapshot returned", func(t *testing.T) {
|
runStep(t, "filtered snapshot returned", func(t *testing.T) {
|
||||||
result, err := typ.Fetch(opts, req)
|
result, err := store.Get(ctx, req)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(5), result.Index)
|
require.Equal(t, uint64(5), result.Index)
|
||||||
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
|
expected := newExpectedNodes("node2")
|
||||||
|
expected.Index = 5
|
||||||
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
req.QueryOptions.MinQueryIndex = result.Index
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
})
|
||||||
|
|
||||||
runStep(t, "filtered updates work too", func(t *testing.T) {
|
runStep(t, "filtered updates work too", func(t *testing.T) {
|
||||||
// Simulate multiple registrations happening in one Txn (so all have same
|
// Simulate multiple registrations happening in one Txn (all have same index)
|
||||||
// index)
|
|
||||||
batchEv := newEventBatchWithEvents(
|
batchEv := newEventBatchWithEvents(
|
||||||
// Deregister an existing node
|
// Deregister an existing node
|
||||||
newEventServiceHealthDeregister(20, 1, "web"),
|
newEventServiceHealthDeregister(20, 1, "web"),
|
||||||
// Register another
|
// Register another
|
||||||
newEventServiceHealthRegister(20, 4, "web"),
|
newEventServiceHealthRegister(20, 4, "web"),
|
||||||
)
|
)
|
||||||
client.QueueEvents(batchEv)
|
streamClient.QueueEvents(batchEv)
|
||||||
opts.Timeout = time.Second
|
result, err := store.Get(ctx, req)
|
||||||
result, err := typ.Fetch(opts, req)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(20), result.Index)
|
require.Equal(t, uint64(20), result.Index)
|
||||||
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
|
expected := newExpectedNodes("node2")
|
||||||
|
expected.Index = 20
|
||||||
opts.MinIndex = result.Index
|
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
|
||||||
opts.LastResult = &result
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serviceRequestStub overrides NewMaterializer so that test can use a fake
|
||||||
|
// StreamClient.
|
||||||
|
type serviceRequestStub struct {
|
||||||
|
serviceRequest
|
||||||
|
streamClient submatview.StreamClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) {
|
||||||
|
view, err := newHealthView(r.ServiceSpecificRequest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return submatview.NewMaterializer(submatview.Deps{
|
||||||
|
View: view,
|
||||||
|
Client: r.streamClient,
|
||||||
|
Logger: hclog.New(nil),
|
||||||
|
Request: newMaterializerRequest(r.ServiceSpecificRequest),
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
|
||||||
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
|
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
|
||||||
|
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
|
||||||
|
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||||
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||||
|
Op: pbsubscribe.CatalogOp_Register,
|
||||||
|
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{
|
||||||
|
ID: nodeID,
|
||||||
|
Node: node,
|
||||||
|
Address: addr,
|
||||||
|
Datacenter: "dc1",
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
ID: svc,
|
||||||
|
Service: svc,
|
||||||
|
Port: 8080,
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
|
||||||
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
|
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||||
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||||
|
Op: pbsubscribe.CatalogOp_Deregister,
|
||||||
|
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{
|
||||||
|
Node: node,
|
||||||
|
},
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
ID: svc,
|
||||||
|
Service: svc,
|
||||||
|
Port: 8080,
|
||||||
|
Weights: &pbservice.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
// The original insertion index since a delete doesn't update
|
||||||
|
// this. This magic value came from state store tests where we
|
||||||
|
// setup at index 10 and then mutate at index 100. It can be
|
||||||
|
// modified by the caller later and makes it easier than having
|
||||||
|
// yet another argument in the common case.
|
||||||
|
CreateIndex: 10,
|
||||||
|
ModifyIndex: 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
|
||||||
|
events := make([]*pbsubscribe.Event, len(evs)+1)
|
||||||
|
events[0] = first
|
||||||
|
for i := range evs {
|
||||||
|
events[i+1] = evs[i]
|
||||||
|
}
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: first.Index,
|
||||||
|
Payload: &pbsubscribe.Event_EventBatch{
|
||||||
|
EventBatch: &pbsubscribe.EventBatch{Events: events},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getNamespace returns a namespace if namespace support exists, otherwise
|
||||||
|
// returns the empty string. It allows the same tests to work in both oss and ent
|
||||||
|
// without duplicating the tests.
|
||||||
|
func getNamespace(ns string) string {
|
||||||
|
meta := structs.NewEnterpriseMeta(ns)
|
||||||
|
return meta.NamespaceOrEmpty()
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error {
|
||||||
|
return func(request *pbsubscribe.SubscribeRequest) error {
|
||||||
|
if request.Namespace != ns {
|
||||||
|
return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if !t.Run(name, fn) {
|
if !t.Run(name, fn) {
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
func TestNewFilterEvaluator(t *testing.T) {
|
func TestNewFilterEvaluator(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
|
|
|
@ -249,8 +249,9 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result
|
||||||
result.Index = m.index
|
result.Index = m.index
|
||||||
|
|
||||||
if m.err != nil {
|
if m.err != nil {
|
||||||
|
err := m.err
|
||||||
m.lock.Unlock()
|
m.lock.Unlock()
|
||||||
return result, m.err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Value = m.view.Result(m.index)
|
result.Value = m.view.Result(m.index)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package submatview
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -98,10 +99,11 @@ func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
result, err := e.materializer.getFromView(ctx, info.MinIndex)
|
result, err := e.materializer.getFromView(ctx, info.MinIndex)
|
||||||
|
// context.DeadlineExceeded is translated to nil to match the behaviour of
|
||||||
// TODO: does context.DeadlineExceeded need to be translated into a nil error
|
// agent/cache.Cache.Get.
|
||||||
// to match the old interface?
|
if err == nil || errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue