submatview: setup testing structure

This commit is contained in:
Daniel Nephin 2021-02-22 16:27:18 -05:00
parent e4c503c28e
commit f46a830e48
5 changed files with 357 additions and 68 deletions

View File

@ -1,66 +0,0 @@
package cachetype
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
events chan eventOrErr
ctx context.Context
expectedNamespace string
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{
events: make(chan eventOrErr, 32),
expectedNamespace: ns,
}
}
func (t *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if req.Namespace != t.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
req.Namespace, t.expectedNamespace)
}
t.ctx = ctx
return t, nil
}
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}

View File

@ -80,6 +80,18 @@ func NewMaterializer(deps Deps) *Materializer {
retryWaiter: deps.Waiter,
updateCh: make(chan struct{}),
}
if deps.Waiter == nil {
v.retryWaiter = &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
}
return v
}

View File

@ -19,6 +19,8 @@ type Store struct {
type entry struct {
materializer *Materializer
expiry *ttlcache.Entry
stop func()
// TODO: add watchCount
}
// TODO: start expiration loop
@ -29,13 +31,47 @@ func NewStore() *Store {
}
}
var ttl = 20 * time.Minute
// Run the expiration loop until the context is cancelled.
func (s *Store) Run(ctx context.Context) {
for {
s.lock.RLock()
timer := s.expiryHeap.Next()
s.lock.RUnlock()
select {
case <-ctx.Done():
timer.Stop()
return
case <-s.expiryHeap.NotifyCh:
timer.Stop()
continue
case <-timer.Wait():
s.lock.Lock()
he := timer.Entry
s.expiryHeap.Remove(he.Index())
// TODO: expiry here
// if e.watchCount == 0 {}
e := s.byKey[he.Key()]
e.stop()
//delete(s.entries, entry.Key())
s.lock.Unlock()
}
}
}
// TODO: godoc
var idleTTL = 20 * time.Minute
// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get(
ctx context.Context,
// TODO: remove typ param, make it part of the Request interface.
typ string,
req Request,
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
@ -45,7 +81,7 @@ func (s *Store) Get(
e := s.getEntry(key, req.NewMaterializer)
// TODO: requires a lock to update the heap.
s.expiryHeap.Update(e.expiry.Index(), ttl)
//s.expiryHeap.Update(e.expiry.Index(), info.Timeout + ttl)
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
// TODO: pass context instead of Done chan, also replaces Timeout param
@ -114,12 +150,18 @@ func (s *Store) getEntry(key string, newMat func() *Materializer) entry {
s.lock.Lock()
defer s.lock.Unlock()
// Check again after acquiring the write lock, in case we raced to create the entry.
e, ok = s.byKey[key]
if ok {
return e
}
e = entry{materializer: newMat()}
ctx, cancel := context.WithCancel(context.Background())
e.stop = cancel
go e.materializer.Run(ctx)
s.byKey[key] = e
return e
}

View File

@ -0,0 +1,132 @@
package submatview
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
func TestStore_Get_Fresh(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore()
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(
newEndOfSnapshotEvent(2),
newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(22, 2, "srv1"))
result, md, err := store.Get(ctx, "test", req)
require.NoError(t, err)
require.Equal(t, uint64(22), md.Index)
r, ok := result.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index)
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey("test", req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
store.lock.Lock()
defer store.lock.Unlock()
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
}
type fakeRequest struct {
client *TestStreamingClient
}
func (r *fakeRequest) CacheInfo() cache.RequestInfo {
return cache.RequestInfo{
Key: "key",
Token: "abcd",
Datacenter: "dc1",
Timeout: 4 * time.Second,
}
}
func (r *fakeRequest) NewMaterializer() *Materializer {
return NewMaterializer(Deps{
View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)},
Client: r.client,
Logger: hclog.New(nil),
Request: func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "key",
Token: "abcd",
Datacenter: "dc1",
Index: index,
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
}
return req
},
})
}
type fakeView struct {
srvs map[string]*pbservice.CheckServiceNode
}
func (f *fakeView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
serviceHealth := event.GetServiceHealth()
if serviceHealth == nil {
return fmt.Errorf("unexpected event type for service health view: %T",
event.GetPayload())
}
id := serviceHealth.CheckServiceNode.UniqueID()
switch serviceHealth.Op {
case pbsubscribe.CatalogOp_Register:
f.srvs[id] = serviceHealth.CheckServiceNode
case pbsubscribe.CatalogOp_Deregister:
delete(f.srvs, id)
}
}
return nil
}
func (f *fakeView) Result(index uint64) (interface{}, error) {
srvs := make([]*pbservice.CheckServiceNode, 0, len(f.srvs))
for _, srv := range f.srvs {
srvs = append(srvs, srv)
}
return fakeResult{srvs: srvs, index: index}, nil
}
type fakeResult struct {
srvs []*pbservice.CheckServiceNode
index uint64
}
func (f *fakeView) Reset() {
f.srvs = make(map[string]*pbservice.CheckServiceNode)
}
// TODO: Get with an entry that already has index
// TODO: Get with an entry that is not yet at index
func TestStore_Notify(t *testing.T) {
// TODO: Notify with no existing entry
// TODO: Notify with Get
// TODO: Notify multiple times same key
// TODO: Notify no update if index is not past MinIndex.
}

View File

@ -0,0 +1,169 @@
package submatview
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
events chan eventOrErr
ctx context.Context
expectedNamespace string
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{
events: make(chan eventOrErr, 32),
expectedNamespace: ns,
}
}
func (t *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if req.Namespace != t.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
req.Namespace, t.expectedNamespace)
}
t.ctx = ctx
return t, nil
}
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
ID: nodeID,
Node: node,
Address: addr,
Datacenter: "dc1",
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
}
}
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}