diff --git a/agent/cache-types/streaming_test.go b/agent/cache-types/streaming_test.go deleted file mode 100644 index b12809c3c..000000000 --- a/agent/cache-types/streaming_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package cachetype - -import ( - "context" - "fmt" - - "google.golang.org/grpc" - - "github.com/hashicorp/consul/proto/pbsubscribe" -) - -// TestStreamingClient is a mock StreamingClient for testing that allows -// for queueing up custom events to a subscriber. -type TestStreamingClient struct { - pbsubscribe.StateChangeSubscription_SubscribeClient - events chan eventOrErr - ctx context.Context - expectedNamespace string -} - -type eventOrErr struct { - Err error - Event *pbsubscribe.Event -} - -func NewTestStreamingClient(ns string) *TestStreamingClient { - return &TestStreamingClient{ - events: make(chan eventOrErr, 32), - expectedNamespace: ns, - } -} - -func (t *TestStreamingClient) Subscribe( - ctx context.Context, - req *pbsubscribe.SubscribeRequest, - _ ...grpc.CallOption, -) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { - if req.Namespace != t.expectedNamespace { - return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", - req.Namespace, t.expectedNamespace) - } - t.ctx = ctx - return t, nil -} - -func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { - for _, e := range events { - t.events <- eventOrErr{Event: e} - } -} - -func (t *TestStreamingClient) QueueErr(err error) { - t.events <- eventOrErr{Err: err} -} - -func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { - select { - case eoe := <-t.events: - if eoe.Err != nil { - return nil, eoe.Err - } - return eoe.Event, nil - case <-t.ctx.Done(): - return nil, t.ctx.Err() - } -} diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 51568d7d9..85ac163ba 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -80,6 +80,18 @@ func NewMaterializer(deps Deps) *Materializer { retryWaiter: deps.Waiter, updateCh: make(chan struct{}), } + if deps.Waiter == nil { + v.retryWaiter = &retry.Waiter{ + MinFailures: 1, + // Start backing off with small increments (200-400ms) which will double + // each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000 + // after that). (retry.Wait applies Max limit after jitter right now). + Factor: 200 * time.Millisecond, + MinWait: 0, + MaxWait: 60 * time.Second, + Jitter: retry.NewJitter(100), + } + } return v } diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 9df57214b..8cdf0a0a8 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -19,6 +19,8 @@ type Store struct { type entry struct { materializer *Materializer expiry *ttlcache.Entry + stop func() + // TODO: add watchCount } // TODO: start expiration loop @@ -29,13 +31,47 @@ func NewStore() *Store { } } -var ttl = 20 * time.Minute +// Run the expiration loop until the context is cancelled. +func (s *Store) Run(ctx context.Context) { + for { + s.lock.RLock() + timer := s.expiryHeap.Next() + s.lock.RUnlock() + + select { + case <-ctx.Done(): + timer.Stop() + return + case <-s.expiryHeap.NotifyCh: + timer.Stop() + continue + + case <-timer.Wait(): + s.lock.Lock() + + he := timer.Entry + s.expiryHeap.Remove(he.Index()) + + // TODO: expiry here + // if e.watchCount == 0 {} + e := s.byKey[he.Key()] + e.stop() + //delete(s.entries, entry.Key()) + + s.lock.Unlock() + } + } +} + +// TODO: godoc +var idleTTL = 20 * time.Minute // Get a value from the store, blocking if the store has not yet seen the // req.Index value. // See agent/cache.Cache.Get for complete documentation. func (s *Store) Get( ctx context.Context, + // TODO: remove typ param, make it part of the Request interface. typ string, req Request, // TODO: only the Index field of ResultMeta is relevant, return a result struct instead. @@ -45,7 +81,7 @@ func (s *Store) Get( e := s.getEntry(key, req.NewMaterializer) // TODO: requires a lock to update the heap. - s.expiryHeap.Update(e.expiry.Index(), ttl) + //s.expiryHeap.Update(e.expiry.Index(), info.Timeout + ttl) // TODO: no longer any need to return cache.FetchResult from Materializer.Fetch // TODO: pass context instead of Done chan, also replaces Timeout param @@ -114,12 +150,18 @@ func (s *Store) getEntry(key string, newMat func() *Materializer) entry { s.lock.Lock() defer s.lock.Unlock() + // Check again after acquiring the write lock, in case we raced to create the entry. e, ok = s.byKey[key] if ok { return e } e = entry{materializer: newMat()} + + ctx, cancel := context.WithCancel(context.Background()) + e.stop = cancel + go e.materializer.Run(ctx) + s.byKey[key] = e return e } diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go new file mode 100644 index 000000000..a567720fa --- /dev/null +++ b/agent/submatview/store_test.go @@ -0,0 +1,132 @@ +package submatview + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/proto/pbcommon" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/pbsubscribe" +) + +func TestStore_Get_Fresh(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := NewStore() + go store.Run(ctx) + + req := &fakeRequest{ + client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace), + } + req.client.QueueEvents( + newEndOfSnapshotEvent(2), + newEventServiceHealthRegister(10, 1, "srv1"), + newEventServiceHealthRegister(22, 2, "srv1")) + + result, md, err := store.Get(ctx, "test", req) + require.NoError(t, err) + require.Equal(t, uint64(22), md.Index) + + r, ok := result.(fakeResult) + require.True(t, ok) + require.Len(t, r.srvs, 2) + require.Equal(t, uint64(22), r.index) + + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey("test", req.CacheInfo())] + require.Equal(t, 0, e.expiry.Index()) + + store.lock.Lock() + defer store.lock.Unlock() + require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) +} + +type fakeRequest struct { + client *TestStreamingClient +} + +func (r *fakeRequest) CacheInfo() cache.RequestInfo { + return cache.RequestInfo{ + Key: "key", + Token: "abcd", + Datacenter: "dc1", + Timeout: 4 * time.Second, + } +} + +func (r *fakeRequest) NewMaterializer() *Materializer { + return NewMaterializer(Deps{ + View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)}, + Client: r.client, + Logger: hclog.New(nil), + Request: func(index uint64) pbsubscribe.SubscribeRequest { + req := pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "key", + Token: "abcd", + Datacenter: "dc1", + Index: index, + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, + } + return req + }, + }) +} + +type fakeView struct { + srvs map[string]*pbservice.CheckServiceNode +} + +func (f *fakeView) Update(events []*pbsubscribe.Event) error { + for _, event := range events { + serviceHealth := event.GetServiceHealth() + if serviceHealth == nil { + return fmt.Errorf("unexpected event type for service health view: %T", + event.GetPayload()) + } + + id := serviceHealth.CheckServiceNode.UniqueID() + switch serviceHealth.Op { + case pbsubscribe.CatalogOp_Register: + f.srvs[id] = serviceHealth.CheckServiceNode + + case pbsubscribe.CatalogOp_Deregister: + delete(f.srvs, id) + } + } + return nil +} + +func (f *fakeView) Result(index uint64) (interface{}, error) { + srvs := make([]*pbservice.CheckServiceNode, 0, len(f.srvs)) + for _, srv := range f.srvs { + srvs = append(srvs, srv) + } + return fakeResult{srvs: srvs, index: index}, nil +} + +type fakeResult struct { + srvs []*pbservice.CheckServiceNode + index uint64 +} + +func (f *fakeView) Reset() { + f.srvs = make(map[string]*pbservice.CheckServiceNode) +} + +// TODO: Get with an entry that already has index +// TODO: Get with an entry that is not yet at index + +func TestStore_Notify(t *testing.T) { + // TODO: Notify with no existing entry + // TODO: Notify with Get + // TODO: Notify multiple times same key + // TODO: Notify no update if index is not past MinIndex. +} diff --git a/agent/submatview/streaming_test.go b/agent/submatview/streaming_test.go new file mode 100644 index 000000000..f84602179 --- /dev/null +++ b/agent/submatview/streaming_test.go @@ -0,0 +1,169 @@ +package submatview + +import ( + "context" + "fmt" + + "google.golang.org/grpc" + + "github.com/hashicorp/consul/proto/pbcommon" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/types" + + "github.com/hashicorp/consul/proto/pbsubscribe" +) + +// TestStreamingClient is a mock StreamingClient for testing that allows +// for queueing up custom events to a subscriber. +type TestStreamingClient struct { + pbsubscribe.StateChangeSubscription_SubscribeClient + events chan eventOrErr + ctx context.Context + expectedNamespace string +} + +type eventOrErr struct { + Err error + Event *pbsubscribe.Event +} + +func NewTestStreamingClient(ns string) *TestStreamingClient { + return &TestStreamingClient{ + events: make(chan eventOrErr, 32), + expectedNamespace: ns, + } +} + +func (t *TestStreamingClient) Subscribe( + ctx context.Context, + req *pbsubscribe.SubscribeRequest, + _ ...grpc.CallOption, +) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { + if req.Namespace != t.expectedNamespace { + return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", + req.Namespace, t.expectedNamespace) + } + t.ctx = ctx + return t, nil +} + +func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { + for _, e := range events { + t.events <- eventOrErr{Event: e} + } +} + +func (t *TestStreamingClient) QueueErr(err error) { + t.events <- eventOrErr{Err: err} +} + +func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { + select { + case eoe := <-t.events: + if eoe.Err != nil { + return nil, eoe.Err + } + return eoe.Event, nil + case <-t.ctx.Done(): + return nil, t.ctx.Err() + } +} + +func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + } +} + +func newNewSnapshotToFollowEvent() *pbsubscribe.Event { + return &pbsubscribe.Event{ + Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, + } +} + +func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) + addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) + + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + ID: nodeID, + Node: node, + Address: addr, + Datacenter: "dc1", + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + RaftIndex: pbcommon.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + }, + }, + }, + } +} + +func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { + node := fmt.Sprintf("node%d", nodeNum) + + return &pbsubscribe.Event{ + Index: index, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Deregister, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: node, + }, + Service: &pbservice.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &pbservice.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: pbcommon.RaftIndex{ + // The original insertion index since a delete doesn't update + // this. This magic value came from state store tests where we + // setup at index 10 and then mutate at index 100. It can be + // modified by the caller later and makes it easier than having + // yet another argument in the common case. + CreateIndex: 10, + ModifyIndex: 10, + }, + }, + }, + }, + }, + } +} + +func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event { + events := make([]*pbsubscribe.Event, len(evs)+1) + events[0] = first + for i := range evs { + events[i+1] = evs[i] + } + return &pbsubscribe.Event{ + Index: first.Index, + Payload: &pbsubscribe.Event_EventBatch{ + EventBatch: &pbsubscribe.EventBatch{Events: events}, + }, + } +}