Remove a racy and failing test
This test is super racy (it's not just a single line). This test also starts failing once streaming is enabled, because the cache rate limit no longer applies to the requests in the test. The queries use streaming instead of the cache. This test is no longer valid, and the functionality is already well tested by TestCacheThrottle. Instead of spending time rewriting this test, let's remove it. ``` WARNING: DATA RACE Read at 0x00c01de410fc by goroutine 735: github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:1024 +0x9af github.com/hashicorp/consul/testrpc.WaitForTestAgent() /home/daniel/pers/code/consul/testrpc/wait.go:99 +0x209 github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:966 +0x1ad testing.tRunner() /usr/lib/go/src/testing/testing.go:1193 +0x202 Previous write at 0x00c01de410fc by goroutine 605: github.com/hashicorp/consul/agent.TestCacheRateLimit.func1.2() /home/daniel/pers/code/consul/agent/agent_test.go:998 +0xe9 Goroutine 735 (running) created at: testing.(*T).Run() /usr/lib/go/src/testing/testing.go:1238 +0x5d7 github.com/hashicorp/consul/agent.TestCacheRateLimit() /home/daniel/pers/code/consul/agent/agent_test.go:961 +0x375 testing.tRunner() /usr/lib/go/src/testing/testing.go:1193 +0x202 Goroutine 605 (finished) created at: github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:1022 +0x91e github.com/hashicorp/consul/testrpc.WaitForTestAgent() /home/daniel/pers/code/consul/testrpc/wait.go:99 +0x209 github.com/hashicorp/consul/agent.TestCacheRateLimit.func1() /home/daniel/pers/code/consul/agent/agent_test.go:966 +0x1ad testing.tRunner() /usr/lib/go/src/testing/testing.go:1193 +0x202 ```
This commit is contained in:
parent
d0e32cc3ba
commit
62beaa80f3
|
@ -18,7 +18,6 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,7 +29,6 @@ import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"gopkg.in/square/go-jose.v2/jwt"
|
"gopkg.in/square/go-jose.v2/jwt"
|
||||||
|
|
||||||
|
@ -937,110 +935,6 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheRateLimit(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
tests := []struct {
|
|
||||||
// count := number of updates performed (1 every 10ms)
|
|
||||||
count int
|
|
||||||
// rateLimit rate limiting of cache
|
|
||||||
rateLimit float64
|
|
||||||
// Minimum number of updates to see from a cache perspective
|
|
||||||
// We add a value with tolerance to work even on a loaded CI
|
|
||||||
minUpdates int
|
|
||||||
}{
|
|
||||||
// 250 => we have a test running for at least 2.5s
|
|
||||||
{250, 0.5, 1},
|
|
||||||
{250, 1, 1},
|
|
||||||
{300, 2, 2},
|
|
||||||
}
|
|
||||||
for _, currentTest := range tests {
|
|
||||||
t.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
|
|
||||||
tt := currentTest
|
|
||||||
t.Parallel()
|
|
||||||
a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }")
|
|
||||||
defer a.Shutdown()
|
|
||||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
||||||
|
|
||||||
cfg := a.config
|
|
||||||
require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate)
|
|
||||||
require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst)
|
|
||||||
cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit)
|
|
||||||
cfg.Cache.EntryFetchMaxBurst = 1
|
|
||||||
a.reloadConfigInternal(cfg)
|
|
||||||
require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate)
|
|
||||||
require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
stillProcessing := true
|
|
||||||
|
|
||||||
injectService := func(i int) {
|
|
||||||
srv := &structs.NodeService{
|
|
||||||
Service: "redis",
|
|
||||||
ID: "redis",
|
|
||||||
Port: 1024 + i,
|
|
||||||
Address: fmt.Sprintf("10.0.1.%d", i%255),
|
|
||||||
}
|
|
||||||
|
|
||||||
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
|
||||||
require.Nil(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
runUpdates := func() {
|
|
||||||
wg.Add(tt.count)
|
|
||||||
for i := 0; i < tt.count; i++ {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
injectService(i)
|
|
||||||
wg.Done()
|
|
||||||
}
|
|
||||||
stillProcessing = false
|
|
||||||
}
|
|
||||||
|
|
||||||
getIndex := func(t *testing.T, oldIndex int) int {
|
|
||||||
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
resp := httptest.NewRecorder()
|
|
||||||
a.srv.handler(false).ServeHTTP(resp, req)
|
|
||||||
// Key doesn't actually exist so we should get 404
|
|
||||||
if got, want := resp.Code, http.StatusOK; got != want {
|
|
||||||
t.Fatalf("bad response code got %d want %d", got, want)
|
|
||||||
}
|
|
||||||
index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index"))
|
|
||||||
require.NoError(t, err)
|
|
||||||
return index
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
start := time.Now()
|
|
||||||
injectService(0)
|
|
||||||
// Get the first index
|
|
||||||
index := getIndex(t, 0)
|
|
||||||
require.Greater(t, index, 2)
|
|
||||||
go runUpdates()
|
|
||||||
numberOfUpdates := 0
|
|
||||||
for stillProcessing {
|
|
||||||
oldIndex := index
|
|
||||||
index = getIndex(t, oldIndex)
|
|
||||||
require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only")
|
|
||||||
numberOfUpdates++
|
|
||||||
}
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed)
|
|
||||||
summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit)
|
|
||||||
|
|
||||||
// We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock
|
|
||||||
require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary))
|
|
||||||
// We must have at least being notified a few times
|
|
||||||
require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary))
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddServiceIPv4TaggedDefault(t *testing.T) {
|
func TestAddServiceIPv4TaggedDefault(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
Loading…
Reference in New Issue