From a295d9e5dbf64ec09286e93a169864066d562985 Mon Sep 17 00:00:00 2001 From: Freddy Date: Fri, 12 Jul 2019 09:52:26 -0600 Subject: [PATCH] Flaky test overhaul (#6100) --- agent/cache/watch_test.go | 4 +- agent/consul/acl_endpoint_test.go | 10 +-- agent/consul/acl_replication_test.go | 2 +- agent/consul/catalog_endpoint_test.go | 59 ++++++++--------- agent/consul/leader_test.go | 19 +++--- agent/consul/server_test.go | 50 ++++++++++++--- agent/consul/snapshot_endpoint_test.go | 15 +++-- agent/consul/stats_fetcher_test.go | 15 +++-- agent/health_endpoint_test.go | 11 ++-- agent/local/state_test.go | 87 +++++++++++++------------- api/api_test.go | 1 + command/agent/agent_test.go | 53 ++++------------ command/exec/exec_test.go | 14 +++-- connect/proxy/conn_test.go | 17 +++-- 14 files changed, 188 insertions(+), 169 deletions(-) diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go index 1995987ce..38661374e 100644 --- a/agent/cache/watch_test.go +++ b/agent/cache/watch_test.go @@ -247,8 +247,8 @@ func TestCacheNotifyPolling(t *testing.T) { // wait for the next batch of responses events := make([]UpdateEvent, 0) - // 110 is needed to allow for the jitter - timeout := time.After(110 * time.Millisecond) + // At least 110ms is needed to allow for the jitter + timeout := time.After(150 * time.Millisecond) for i := 0; i < 2; i++ { select { diff --git a/agent/consul/acl_endpoint_test.go b/agent/consul/acl_endpoint_test.go index 95c63ddd2..afd00e0c8 100644 --- a/agent/consul/acl_endpoint_test.go +++ b/agent/consul/acl_endpoint_test.go @@ -668,7 +668,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) { t.Run("expired tokens are filtered", func(t *testing.T) { // insert a token that will expire token, err := upsertTestToken(codec, "root", "dc1", func(t *structs.ACLToken) { - t.ExpirationTTL = 20 * time.Millisecond + t.ExpirationTTL = 200 * time.Millisecond }) require.NoError(t, err) @@ -686,8 +686,6 @@ func TestACLEndpoint_TokenRead(t *testing.T) { require.Equal(t, token, resp.Token) }) - time.Sleep(50 * time.Millisecond) - t.Run("not returned when expired", func(t *testing.T) { req := structs.ACLTokenGetRequest{ Datacenter: "dc1", @@ -698,8 +696,10 @@ func TestACLEndpoint_TokenRead(t *testing.T) { resp := structs.ACLTokenResponse{} - require.NoError(t, acl.TokenRead(&req, &resp)) - require.Nil(t, resp.Token) + retry.Run(t, func(r *retry.R) { + require.NoError(r, acl.TokenRead(&req, &resp)) + require.Nil(r, resp.Token) + }) }) }) diff --git a/agent/consul/acl_replication_test.go b/agent/consul/acl_replication_test.go index 53842b37d..e74adf25c 100644 --- a/agent/consul/acl_replication_test.go +++ b/agent/consul/acl_replication_test.go @@ -380,7 +380,7 @@ func TestACLReplication_Tokens(t *testing.T) { retry.Run(t, func(r *retry.R) { _, policy, err := s2.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID) require.NoError(r, err) - require.NotNil(t, policy) + require.NotNil(r, policy) }) // add some local tokens to the secondary DC diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index 4556abd82..5c11dc9e8 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -1046,26 +1046,28 @@ func TestCatalog_ListNodes_StaleRead(t *testing.T) { QueryOptions: structs.QueryOptions{AllowStale: true}, } var out structs.IndexedNodes - if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil { - t.Fatalf("err: %v", err) - } - found := false - for _, n := range out.Nodes { - if n.Node == "foo" { - found = true + retry.Run(t, func(r *retry.R) { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) } - } - if !found { - t.Fatalf("failed to find foo in %#v", out.Nodes) - } - if out.QueryMeta.LastContact == 0 { - t.Fatalf("should have a last contact time") - } - if !out.QueryMeta.KnownLeader { - t.Fatalf("should have known leader") - } + found := false + for _, n := range out.Nodes { + if n.Node == "foo" { + found = true + } + } + if !found { + r.Fatalf("failed to find foo in %#v", out.Nodes) + } + if out.QueryMeta.LastContact == 0 { + r.Fatalf("should have a last contact time") + } + if !out.QueryMeta.KnownLeader { + r.Fatalf("should have known leader") + } + }) } func TestCatalog_ListNodes_ConsistentRead_Fail(t *testing.T) { @@ -1621,18 +1623,19 @@ func TestCatalog_ListServices_Stale(t *testing.T) { waitForLeader(s1, s2) testrpc.WaitForLeader(t, s2.RPC, "dc1") - if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { - t.Fatalf("err: %v", err) - } - // Should find the services - if len(out.Services) != 1 { - t.Fatalf("bad: %#v", out.Services) - } - - if !out.KnownLeader { - t.Fatalf("should have a leader: %v", out) - } + retry.Run(t, func(r *retry.R) { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { + r.Fatalf("err: %v", err) + } + // Should find the services + if len(out.Services) != 1 { + r.Fatalf("bad: %#v", out.Services) + } + if !out.KnownLeader { + r.Fatalf("should have a leader: %v", out) + } + }) s1.Leave() s1.Shutdown() diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 01b676403..7372bef73 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -11,7 +11,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/net-rpc-msgpackrpc" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" ) @@ -756,20 +756,20 @@ func TestLeader_ReapTombstones(t *testing.T) { // Make sure there's a tombstone. state := s1.fsm.State() - func() { + retry.Run(t, func(r *retry.R) { snap := state.Snapshot() defer snap.Close() stones, err := snap.Tombstones() if err != nil { - t.Fatalf("err: %s", err) + r.Fatalf("err: %s", err) } if stones.Next() == nil { - t.Fatalf("missing tombstones") + r.Fatalf("missing tombstones") } if stones.Next() != nil { - t.Fatalf("unexpected extra tombstones") + r.Fatalf("unexpected extra tombstones") } - }() + }) // Check that the new leader has a pending GC expiration by // watching for the tombstone to get removed. @@ -930,9 +930,9 @@ func TestLeader_ChangeServerID(t *testing.T) { }) defer os.RemoveAll(dir4) defer s4.Shutdown() + joinLAN(t, s4, s1) - testrpc.WaitForTestAgent(t, s1.RPC, "dc1") - testrpc.WaitForTestAgent(t, s4.RPC, "dc1") + testrpc.WaitForLeader(t, s4.RPC, "dc1") servers[2] = s4 // While integrating #3327 it uncovered that this test was flaky. The @@ -942,11 +942,10 @@ func TestLeader_ChangeServerID(t *testing.T) { // away the connection if it sees an EOF error, since there's no way // that connection is going to work again. This made this test reliable // since it will make a new connection to s4. - - // Make sure the dead server is removed and we're back to 3 total peers retry.Run(t, func(r *retry.R) { r.Check(wantRaft(servers)) for _, s := range servers { + // Make sure the dead server is removed and we're back below 4 r.Check(wantPeers(s, 3)) } }) diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index f79a27aee..c3c719481 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -158,10 +158,18 @@ func testServerWithConfig(t *testing.T, cb func(*Config)) (string, *Server) { if cb != nil { cb(config) } - srv, err := newServer(config) - if err != nil { - t.Fatalf("err: %v", err) - } + + var srv *Server + var err error + + // Retry added to avoid cases where bind addr is already in use + retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) { + srv, err = newServer(config) + if err != nil { + os.RemoveAll(dir) + r.Fatalf("err: %v", err) + } + }) return dir, srv } @@ -686,16 +694,42 @@ func TestServer_Expect(t *testing.T) { r.Check(wantPeers(s3, 3)) }) - // Make sure a leader is elected, grab the current term and then add in - // the fourth server. - testrpc.WaitForLeader(t, s1.RPC, "dc1") - termBefore := s1.raft.Stats()["last_log_term"] + // Join the fourth node. joinLAN(t, s4, s1) // Wait for the new server to see itself added to the cluster. retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2, s3, s4})) }) +} + +// Should not trigger bootstrap and new election when s3 joins, since cluster exists +func TestServer_AvoidReBootstrap(t *testing.T) { + dir1, s1 := testServerDCExpect(t, "dc1", 2) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCExpect(t, "dc1", 0) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCExpect(t, "dc1", 2) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Join the first two servers + joinLAN(t, s2, s1) + + // Make sure a leader is elected, grab the current term and then add in + // the third server. + testrpc.WaitForLeader(t, s1.RPC, "dc1") + termBefore := s1.raft.Stats()["last_log_term"] + joinLAN(t, s3, s1) + + // Wait for the new server to see itself added to the cluster. + retry.Run(t, func(r *retry.R) { + r.Check(wantRaft([]*Server{s1, s2, s3})) + }) // Make sure there's still a leader and that the term didn't change, // so we know an election didn't occur. diff --git a/agent/consul/snapshot_endpoint_test.go b/agent/consul/snapshot_endpoint_test.go index 62ed89fac..f3717b292 100644 --- a/agent/consul/snapshot_endpoint_test.go +++ b/agent/consul/snapshot_endpoint_test.go @@ -12,7 +12,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/net-rpc-msgpackrpc" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" ) // verifySnapshot is a helper that does a snapshot and restore. @@ -294,9 +294,14 @@ func TestSnapshot_ACLDeny(t *testing.T) { } func TestSnapshot_Forward_Leader(t *testing.T) { - t.Parallel() dir1, s1 := testServerWithConfig(t, func(c *Config) { c.Bootstrap = true + c.SerfWANConfig = nil + + // Effectively disable autopilot + // Changes in server config leads flakiness because snapshotting + // fails if there are config changes outstanding + c.AutopilotInterval = 50 * time.Second // Since we are doing multiple restores to the same leader, // the default short time for a reconcile can cause the @@ -306,17 +311,19 @@ func TestSnapshot_Forward_Leader(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") dir2, s2 := testServerWithConfig(t, func(c *Config) { c.Bootstrap = false + c.SerfWANConfig = nil + c.AutopilotInterval = 50 * time.Second }) defer os.RemoveAll(dir2) defer s2.Shutdown() - testrpc.WaitForTestAgent(t, s1.RPC, "dc1") // Try to join. joinLAN(t, s2, s1) - testrpc.WaitForTestAgent(t, s2.RPC, "dc1") + testrpc.WaitForLeader(t, s2.RPC, "dc1") // Run against the leader and the follower to ensure we forward. When // we changed to Raft protocol version 3, since we only have two servers, diff --git a/agent/consul/stats_fetcher_test.go b/agent/consul/stats_fetcher_test.go index a7829d46b..0148e6657 100644 --- a/agent/consul/stats_fetcher_test.go +++ b/agent/consul/stats_fetcher_test.go @@ -13,7 +13,6 @@ import ( ) func TestStatsFetcher(t *testing.T) { - t.Parallel() dir1, s1 := testServerDCExpect(t, "dc1", 3) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -53,18 +52,18 @@ func TestStatsFetcher(t *testing.T) { defer cancel() stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers()) if len(stats) != 3 { - t.Fatalf("bad: %#v", stats) + r.Fatalf("bad: %#v", stats) } for id, stat := range stats { switch types.NodeID(id) { case s1.config.NodeID, s2.config.NodeID, s3.config.NodeID: // OK default: - t.Fatalf("bad: %s", id) + r.Fatalf("bad: %s", id) } if stat == nil || stat.LastTerm == 0 { - t.Fatalf("bad: %#v", stat) + r.Fatalf("bad: %#v", stat) } } }) @@ -81,20 +80,20 @@ func TestStatsFetcher(t *testing.T) { defer cancel() stats := s1.statsFetcher.Fetch(ctx, s1.LANMembers()) if len(stats) != 2 { - t.Fatalf("bad: %#v", stats) + r.Fatalf("bad: %#v", stats) } for id, stat := range stats { switch types.NodeID(id) { case s1.config.NodeID, s2.config.NodeID: // OK case s3.config.NodeID: - t.Fatalf("bad") + r.Fatalf("bad") default: - t.Fatalf("bad: %s", id) + r.Fatalf("bad: %s", id) } if stat == nil || stat.LastTerm == 0 { - t.Fatalf("bad: %#v", stat) + r.Fatalf("bad: %#v", stat) } } }) diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 9cd61537a..133d90237 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -892,11 +892,12 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) { }, } - testrpc.WaitForLeader(t, a.RPC, dc) - var out struct{} - if err := a.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) - } + retry.Run(t, func(r *retry.R) { + var out struct{} + if err := a.RPC("Catalog.Register", args, &out); err != nil { + r.Fatalf("err: %v", err) + } + }) t.Run("bc_no_query_value", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/health/service/consul?passing", nil) diff --git a/agent/local/state_test.go b/agent/local/state_test.go index aa66541a5..3bc5f2bd2 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -21,7 +21,6 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/types" - "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -165,9 +164,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) { addrs := services.NodeServices.Node.TaggedAddresses meta := services.NodeServices.Node.Meta delete(meta, structs.MetaSegmentKey) // Added later, not in config. - verify.Values(t, "node id", id, a.Config.NodeID) - verify.Values(t, "tagged addrs", addrs, a.Config.TaggedAddresses) - verify.Values(t, "node meta", meta, a.Config.NodeMeta) + assert.Equal(t, a.Config.NodeID, id) + assert.Equal(t, a.Config.TaggedAddresses, addrs) + assert.Equal(t, a.Config.NodeMeta, meta) // We should have 6 services (consul included) if len(services.NodeServices.Services) != 6 { @@ -593,46 +592,44 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { } var services structs.IndexedNodeServices - if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil { - t.Fatalf("err: %v", err) - } - - // All the services should match - for id, serv := range services.NodeServices.Services { - serv.CreateIndex, serv.ModifyIndex = 0, 0 - switch id { - case "svc_id1": - // tags should be modified but not the port - got := serv - want := &structs.NodeService{ - ID: "svc_id1", - Service: "svc1", - Tags: []string{"tag1_mod"}, - Port: 6100, - EnableTagOverride: true, - Weights: &structs.Weights{ - Passing: 1, - Warning: 1, - }, - } - if !verify.Values(t, "", got, want) { - t.FailNow() - } - case "svc_id2": - got, want := serv, srv2 - if !verify.Values(t, "", got, want) { - t.FailNow() - } - case structs.ConsulServiceID: - // ignore - default: - t.Fatalf("unexpected service: %v", id) + retry.Run(t, func(r *retry.R) { + if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil { + r.Fatalf("err: %v", err) } - } - if err := servicesInSync(a.State, 2); err != nil { - t.Fatal(err) - } + // All the services should match + for id, serv := range services.NodeServices.Services { + serv.CreateIndex, serv.ModifyIndex = 0, 0 + switch id { + case "svc_id1": + // tags should be modified but not the port + got := serv + want := &structs.NodeService{ + ID: "svc_id1", + Service: "svc1", + Tags: []string{"tag1_mod"}, + Port: 6100, + EnableTagOverride: true, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + } + assert.Equal(r, want, got) + case "svc_id2": + got, want := serv, srv2 + assert.Equal(r, want, got) + case structs.ConsulServiceID: + // ignore + default: + r.Fatalf("unexpected service: %v", id) + } + } + + if err := servicesInSync(a.State, 2); err != nil { + r.Fatal(err) + } + }) } func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { @@ -1060,9 +1057,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { addrs := services.NodeServices.Node.TaggedAddresses meta := services.NodeServices.Node.Meta delete(meta, structs.MetaSegmentKey) // Added later, not in config. - verify.Values(t, "node id", id, a.Config.NodeID) - verify.Values(t, "tagged addrs", addrs, a.Config.TaggedAddresses) - verify.Values(t, "node meta", meta, a.Config.NodeMeta) + assert.Equal(t, a.Config.NodeID, id) + assert.Equal(t, a.Config.TaggedAddresses, addrs) + assert.Equal(t, a.Config.NodeMeta, meta) } // Remove one of the checks diff --git a/api/api_test.go b/api/api_test.go index 36b67cc92..9b6c935f2 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -53,6 +53,7 @@ func makeClientWithConfig( if cb1 != nil { cb1(conf) } + // Create server var server *testutil.TestServer var err error diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index e7b936e04..33bb97c22 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -2,19 +2,16 @@ package agent import ( "fmt" + "github.com/hashicorp/consul/testrpc" "os" "os/exec" "path/filepath" "strings" - "sync" "testing" - "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/hashicorp/consul/version" "github.com/mitchellh/cli" ) @@ -82,55 +79,29 @@ func TestConfigFail(t *testing.T) { } func TestRetryJoin(t *testing.T) { - t.Parallel() a := agent.NewTestAgent(t, t.Name(), "") defer a.Shutdown() + + b := agent.NewTestAgent(t, t.Name(), ` + retry_join = ["`+a.Config.SerfBindAddrLAN.String()+`"] + retry_join_wan = ["`+a.Config.SerfBindAddrWAN.String()+`"] + retry_interval = "100ms" + `) + defer b.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") - shutdownCh := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - - tmpDir := testutil.TempDir(t, "consul") - defer os.RemoveAll(tmpDir) - - args := []string{ - "-server", - "-bind", a.Config.BindAddr.String(), - "-data-dir", tmpDir, - "-node", "Node 11111111-1111-1111-1111-111111111111", - "-node-id", "11111111-1111-1111-1111-111111111111", - "-advertise", a.Config.BindAddr.String(), - "-retry-join", a.Config.SerfBindAddrLAN.String(), - "-retry-interval", "1s", - "-retry-join-wan", a.Config.SerfBindAddrWAN.String(), - "-retry-interval-wan", "1s", - } - - ui := cli.NewMockUi() - cmd := New(ui, "", version.Version, "", "", shutdownCh) - // closing shutdownCh triggers a SIGINT which triggers shutdown without leave - // which will return 1 - if code := cmd.Run(args); code != 1 { - t.Log(ui.ErrorWriter.String()) - t.Fatalf("bad: %d", code) - } - }() - retry.Run(t, func(r *retry.R) { if got, want := len(a.LANMembers()), 2; got != want { r.Fatalf("got %d LAN members want %d", got, want) } + }) + + retry.Run(t, func(r *retry.R) { if got, want := len(a.WANMembers()), 2; got != want { r.Fatalf("got %d WAN members want %d", got, want) } }) - - close(shutdownCh) - wg.Wait() } func TestRetryJoinFail(t *testing.T) { diff --git a/command/exec/exec_test.go b/command/exec/exec_test.go index 70777d8ae..c14114c29 100644 --- a/command/exec/exec_test.go +++ b/command/exec/exec_test.go @@ -89,12 +89,14 @@ func TestExecCommand_CrossDC(t *testing.T) { t.Fatalf("err: %v", err) } - if got, want := len(a1.WANMembers()), 2; got != want { - t.Fatalf("got %d WAN members on a1 want %d", got, want) - } - if got, want := len(a2.WANMembers()), 2; got != want { - t.Fatalf("got %d WAN members on a2 want %d", got, want) - } + retry.Run(t, func(r *retry.R) { + if got, want := len(a1.WANMembers()), 2; got != want { + r.Fatalf("got %d WAN members on a1 want %d", got, want) + } + if got, want := len(a2.WANMembers()), 2; got != want { + r.Fatalf("got %d WAN members on a2 want %d", got, want) + } + }) ui := cli.NewMockUi() c := New(ui, nil) diff --git a/connect/proxy/conn_test.go b/connect/proxy/conn_test.go index 29dcdd3e3..92e92f5cb 100644 --- a/connect/proxy/conn_test.go +++ b/connect/proxy/conn_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -91,9 +92,11 @@ func TestConn(t *testing.T) { require.Nil(t, err) require.Equal(t, "ping 2\n", got) - tx, rx := c.Stats() - assert.Equal(t, uint64(7), tx) - assert.Equal(t, uint64(7), rx) + retry.Run(t, func(r *retry.R) { + tx, rx := c.Stats() + assert.Equal(r, uint64(7), tx) + assert.Equal(r, uint64(7), rx) + }) _, err = src.Write([]byte("pong 1\n")) require.Nil(t, err) @@ -108,9 +111,11 @@ func TestConn(t *testing.T) { require.Nil(t, err) require.Equal(t, "pong 2\n", got) - tx, rx = c.Stats() - assert.Equal(t, uint64(14), tx) - assert.Equal(t, uint64(14), rx) + retry.Run(t, func(r *retry.R) { + tx, rx := c.Stats() + assert.Equal(r, uint64(14), tx) + assert.Equal(r, uint64(14), rx) + }) c.Close()