Merge pull request #10364 from hashicorp/dnephin/streaming-e2e-test
submatview: and Store integration test with stream backend
This commit is contained in:
commit
450cce60a1
|
@ -0,0 +1,382 @@
|
|||
package submatview_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/rpc/subscribe"
|
||||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
)
|
||||
|
||||
func TestStore_IntegrationWithBackend(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
var maxIndex uint64 = 200
|
||||
count := &counter{latest: 3}
|
||||
producers := map[string]*eventProducer{
|
||||
"srv1": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv1", count, maxIndex),
|
||||
"srv2": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv2", count, maxIndex),
|
||||
"srv3": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv3", count, maxIndex),
|
||||
}
|
||||
|
||||
sh := snapshotHandler{producers: producers}
|
||||
handlers := map[stream.Topic]stream.SnapshotFunc{
|
||||
pbsubscribe.Topic_ServiceHealth: sh.Snapshot,
|
||||
}
|
||||
pub := stream.NewEventPublisher(handlers, 10*time.Millisecond)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go pub.Run(ctx)
|
||||
|
||||
store := submatview.NewStore(hclog.New(nil))
|
||||
go store.Run(ctx)
|
||||
|
||||
addr := runServer(t, pub)
|
||||
|
||||
consumers := []*consumer{
|
||||
newConsumer(t, addr, store, "srv1"),
|
||||
newConsumer(t, addr, store, "srv1"),
|
||||
newConsumer(t, addr, store, "srv1"),
|
||||
newConsumer(t, addr, store, "srv2"),
|
||||
newConsumer(t, addr, store, "srv2"),
|
||||
newConsumer(t, addr, store, "srv2"),
|
||||
}
|
||||
|
||||
group, gctx := errgroup.WithContext(ctx)
|
||||
for i := range producers {
|
||||
producer := producers[i]
|
||||
group.Go(func() error {
|
||||
producer.Produce(gctx, pub)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
for i := range consumers {
|
||||
consumer := consumers[i]
|
||||
group.Go(func() error {
|
||||
return consumer.Consume(gctx, maxIndex)
|
||||
})
|
||||
}
|
||||
|
||||
_ = group.Wait()
|
||||
|
||||
for i, consumer := range consumers {
|
||||
t.Run(fmt.Sprintf("consumer %d", i), func(t *testing.T) {
|
||||
require.True(t, len(consumer.states) > 2, "expected more than %d events", len(consumer.states))
|
||||
|
||||
expected := producers[consumer.srvName].nodesByIndex
|
||||
for idx, nodes := range consumer.states {
|
||||
assertDeepEqual(t, idx, expected[idx], nodes)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func assertDeepEqual(t *testing.T, idx uint64, x, y interface{}) {
|
||||
t.Helper()
|
||||
if diff := cmp.Diff(x, y, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Fatalf("assertion failed: values at index %d are not equal\n--- expected\n+++ actual\n%v", idx, diff)
|
||||
}
|
||||
}
|
||||
|
||||
func stateFromUpdates(u cache.UpdateEvent) []string {
|
||||
var result []string
|
||||
for _, node := range u.Result.(*structs.IndexedCheckServiceNodes).Nodes {
|
||||
result = append(result, node.Node.Node)
|
||||
}
|
||||
|
||||
sort.Strings(result)
|
||||
return result
|
||||
}
|
||||
|
||||
func runServer(t *testing.T, pub *stream.EventPublisher) net.Addr {
|
||||
subSrv := &subscribe.Server{
|
||||
Backend: backend{pub: pub},
|
||||
Logger: hclog.New(nil),
|
||||
}
|
||||
srv := grpc.NewServer()
|
||||
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subSrv)
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
var g errgroup.Group
|
||||
g.Go(func() error {
|
||||
return srv.Serve(lis)
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
srv.Stop()
|
||||
if err := g.Wait(); err != nil {
|
||||
t.Log(err.Error())
|
||||
}
|
||||
})
|
||||
|
||||
return lis.Addr()
|
||||
}
|
||||
|
||||
type backend struct {
|
||||
pub *stream.EventPublisher
|
||||
}
|
||||
|
||||
func (b backend) ResolveTokenAndDefaultMeta(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) (acl.Authorizer, error) {
|
||||
return acl.AllowAll(), nil
|
||||
}
|
||||
|
||||
func (b backend) Forward(string, func(*grpc.ClientConn) error) (handled bool, err error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (b backend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) {
|
||||
return b.pub.Subscribe(req)
|
||||
}
|
||||
|
||||
var _ subscribe.Backend = (*backend)(nil)
|
||||
|
||||
type eventProducer struct {
|
||||
rand *rand.Rand
|
||||
counter *counter
|
||||
topic stream.Topic
|
||||
srvName string
|
||||
nodesByIndex map[uint64][]string
|
||||
nodesLock sync.Mutex
|
||||
maxIndex uint64
|
||||
}
|
||||
|
||||
func newEventProducer(
|
||||
topic stream.Topic,
|
||||
srvName string,
|
||||
counter *counter,
|
||||
maxIndex uint64,
|
||||
) *eventProducer {
|
||||
return &eventProducer{
|
||||
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
counter: counter,
|
||||
nodesByIndex: map[uint64][]string{},
|
||||
topic: topic,
|
||||
srvName: srvName,
|
||||
maxIndex: maxIndex,
|
||||
}
|
||||
}
|
||||
|
||||
var minEventDelay = 5 * time.Millisecond
|
||||
|
||||
func (e *eventProducer) Produce(ctx context.Context, pub *stream.EventPublisher) {
|
||||
var nodes []string
|
||||
var nextID int
|
||||
|
||||
for ctx.Err() == nil {
|
||||
var event stream.Event
|
||||
|
||||
action := e.rand.Intn(3)
|
||||
if len(nodes) == 0 {
|
||||
action = 1
|
||||
}
|
||||
|
||||
idx := e.counter.Next()
|
||||
switch action {
|
||||
|
||||
case 0: // Deregister
|
||||
nodeIdx := e.rand.Intn(len(nodes))
|
||||
node := nodes[nodeIdx]
|
||||
nodes = append(nodes[:nodeIdx], nodes[nodeIdx+1:]...)
|
||||
|
||||
event = stream.Event{
|
||||
Topic: e.topic,
|
||||
Index: idx,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Deregister,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: node},
|
||||
Service: &structs.NodeService{
|
||||
ID: e.srvName,
|
||||
Service: e.srvName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
case 1: // Register new
|
||||
node := nodeName(nextID)
|
||||
nodes = append(nodes, node)
|
||||
nextID++
|
||||
|
||||
event = stream.Event{
|
||||
Topic: e.topic,
|
||||
Index: idx,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: node},
|
||||
Service: &structs.NodeService{
|
||||
ID: e.srvName,
|
||||
Service: e.srvName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
case 2: // Register update
|
||||
node := nodes[e.rand.Intn(len(nodes))]
|
||||
event = stream.Event{
|
||||
Topic: e.topic,
|
||||
Index: idx,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: node},
|
||||
Service: &structs.NodeService{
|
||||
ID: e.srvName,
|
||||
Service: e.srvName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
e.nodesLock.Lock()
|
||||
pub.Publish([]stream.Event{event})
|
||||
e.nodesByIndex[idx] = copyNodeList(nodes)
|
||||
e.nodesLock.Unlock()
|
||||
|
||||
if idx > e.maxIndex {
|
||||
return
|
||||
}
|
||||
|
||||
delay := time.Duration(rand.Intn(25)) * time.Millisecond
|
||||
time.Sleep(minEventDelay + delay)
|
||||
}
|
||||
}
|
||||
|
||||
func nodeName(i int) string {
|
||||
return fmt.Sprintf("node-%d", i)
|
||||
}
|
||||
|
||||
func copyNodeList(nodes []string) []string {
|
||||
result := make([]string, len(nodes))
|
||||
copy(result, nodes)
|
||||
sort.Strings(result)
|
||||
return result
|
||||
}
|
||||
|
||||
type counter struct {
|
||||
latest uint64
|
||||
}
|
||||
|
||||
func (c *counter) Next() uint64 {
|
||||
return atomic.AddUint64(&c.latest, 1)
|
||||
}
|
||||
|
||||
type consumer struct {
|
||||
healthClient *health.Client
|
||||
states map[uint64][]string
|
||||
srvName string
|
||||
}
|
||||
|
||||
func newConsumer(t *testing.T, addr net.Addr, store *submatview.Store, srv string) *consumer {
|
||||
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
|
||||
c := &health.Client{
|
||||
UseStreamingBackend: true,
|
||||
ViewStore: store,
|
||||
MaterializerDeps: health.MaterializerDeps{
|
||||
Conn: conn,
|
||||
Logger: hclog.New(nil),
|
||||
},
|
||||
}
|
||||
|
||||
return &consumer{
|
||||
healthClient: c,
|
||||
states: make(map[uint64][]string),
|
||||
srvName: srv,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *consumer) Consume(ctx context.Context, maxIndex uint64) error {
|
||||
req := structs.ServiceSpecificRequest{ServiceName: c.srvName}
|
||||
updateCh := make(chan cache.UpdateEvent, 10)
|
||||
|
||||
group, cctx := errgroup.WithContext(ctx)
|
||||
group.Go(func() error {
|
||||
return c.healthClient.Notify(cctx, req, "", updateCh)
|
||||
})
|
||||
group.Go(func() error {
|
||||
var idx uint64
|
||||
for {
|
||||
if idx >= maxIndex {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case u := <-updateCh:
|
||||
idx = u.Meta.Index
|
||||
c.states[u.Meta.Index] = stateFromUpdates(u)
|
||||
case <-cctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
return group.Wait()
|
||||
}
|
||||
|
||||
type snapshotHandler struct {
|
||||
producers map[string]*eventProducer
|
||||
}
|
||||
|
||||
func (s *snapshotHandler) Snapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
|
||||
producer := s.producers[req.Key]
|
||||
|
||||
producer.nodesLock.Lock()
|
||||
defer producer.nodesLock.Unlock()
|
||||
idx := atomic.LoadUint64(&producer.counter.latest)
|
||||
|
||||
// look backwards for an index that was used by the producer
|
||||
nodes, ok := producer.nodesByIndex[idx]
|
||||
for !ok && idx > 0 {
|
||||
idx--
|
||||
nodes, ok = producer.nodesByIndex[idx]
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
event := stream.Event{
|
||||
Topic: producer.topic,
|
||||
Index: idx,
|
||||
Payload: state.EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: node},
|
||||
Service: &structs.NodeService{
|
||||
ID: producer.srvName,
|
||||
Service: producer.srvName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
buf.Append([]stream.Event{event})
|
||||
}
|
||||
return idx, nil
|
||||
}
|
Loading…
Reference in New Issue