From 8f47bbe89ac05d3718e8812f3cc1dec6d3c0106c Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 22 Apr 2021 14:08:35 -0400 Subject: [PATCH] rpcclient:health: fix a data race and flake in tests Split the TestStreamingClient into the two logical components the real client uses. This allows us to test multiple clients properly. Previously writing of ctx from multiple Subscribe calls was showing a data race. Once this was fixed a test started to fail because the request had to be made with a greater index, so that the store.Get call did not return immediately. --- agent/submatview/store_test.go | 2 +- agent/submatview/streaming_test.go | 66 ++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 4a0699bae..8a7bfafcd 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -323,7 +323,7 @@ func TestStore_Notify_ManyRequests(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - req2 = &fakeRequest{client: req.client, key: "key2"} + req2 = &fakeRequest{client: req.client, key: "key2", index: 22} require.NoError(t, store.Notify(ctx, req2, cID, ch1)) go func() { diff --git a/agent/submatview/streaming_test.go b/agent/submatview/streaming_test.go index f84602179..80fec094f 100644 --- a/agent/submatview/streaming_test.go +++ b/agent/submatview/streaming_test.go @@ -3,23 +3,23 @@ package submatview import ( "context" "fmt" + "sync" "google.golang.org/grpc" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" - "github.com/hashicorp/consul/types" - "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/types" ) // TestStreamingClient is a mock StreamingClient for testing that allows // for queueing up custom events to a subscriber. type TestStreamingClient struct { - pbsubscribe.StateChangeSubscription_SubscribeClient - events chan eventOrErr - ctx context.Context expectedNamespace string + subClients []*subscribeClient + lock sync.RWMutex + events []eventOrErr } type eventOrErr struct { @@ -28,44 +28,66 @@ type eventOrErr struct { } func NewTestStreamingClient(ns string) *TestStreamingClient { - return &TestStreamingClient{ - events: make(chan eventOrErr, 32), - expectedNamespace: ns, - } + return &TestStreamingClient{expectedNamespace: ns} } -func (t *TestStreamingClient) Subscribe( +func (s *TestStreamingClient) Subscribe( ctx context.Context, req *pbsubscribe.SubscribeRequest, _ ...grpc.CallOption, ) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { - if req.Namespace != t.expectedNamespace { + if req.Namespace != s.expectedNamespace { return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", - req.Namespace, t.expectedNamespace) + req.Namespace, s.expectedNamespace) } - t.ctx = ctx - return t, nil + c := &subscribeClient{ + events: make(chan eventOrErr, 32), + ctx: ctx, + } + s.lock.Lock() + s.subClients = append(s.subClients, c) + for _, event := range s.events { + c.events <- event + } + s.lock.Unlock() + return c, nil } -func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { +type subscribeClient struct { + grpc.ClientStream + events chan eventOrErr + ctx context.Context +} + +func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { + s.lock.Lock() for _, e := range events { - t.events <- eventOrErr{Event: e} + s.events = append(s.events, eventOrErr{Event: e}) + for _, c := range s.subClients { + c.events <- eventOrErr{Event: e} + } } + s.lock.Unlock() } -func (t *TestStreamingClient) QueueErr(err error) { - t.events <- eventOrErr{Err: err} +func (s *TestStreamingClient) QueueErr(err error) { + s.lock.Lock() + s.events = append(s.events, eventOrErr{Err: err}) + for _, c := range s.subClients { + c.events <- eventOrErr{Err: err} + } + s.lock.Unlock() } -func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { +func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) { select { - case eoe := <-t.events: + case eoe := <-c.events: if eoe.Err != nil { return nil, eoe.Err } return eoe.Event, nil - case <-t.ctx.Done(): - return nil, t.ctx.Err() + case <-c.ctx.Done(): + return nil, c.ctx.Err() } }