make TestCatalogNodes_Blocking less flaky (#7074)

- Explicitly wait to start the test until the initial AE sync of the node.

- Run the blocking query in the main goroutine to cut down on possible
poor goroutine scheduling issues being to blame for delays.

- If the blocking query is woken up with no index change, rerun the
query. This may happen if the CI server is loaded and time dilation is
happening.
This commit is contained in:
R.B. Boyer 2020-01-21 14:58:50 -06:00 committed by GitHub
parent 791f7baa6b
commit ce7ab8abc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 48 deletions

View File

@ -292,70 +292,74 @@ func TestCatalogNodes_Blocking(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WaitForAntiEntropySync())
// Register node
// Run the query
args := &structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedNodes
if err := a.RPC("Catalog.ListNodes", *args, &out); err != nil {
t.Fatalf("err: %v", err)
}
// t.Fatal must be called from the main go routine
// of the test. Because of this we cannot call
// t.Fatal from within the go routines and use
// an error channel instead.
errch := make(chan error, 2)
// Async cause a change
waitIndex := out.Index
start := time.Now()
go func() {
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
start := time.Now()
// register a service after the blocking call
// in order to unblock it.
time.AfterFunc(100*time.Millisecond, func() {
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var out struct{}
errch <- a.RPC("Catalog.Register", args, &out)
})
// now block
req, _ := http.NewRequest("GET", fmt.Sprintf("/v1/catalog/nodes?wait=3s&index=%d", out.Index+1), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogNodes(resp, req)
if err != nil {
errch <- err
time.Sleep(100 * time.Millisecond)
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
// Should block for a while
if d := time.Since(start); d < 50*time.Millisecond {
errch <- fmt.Errorf("too fast: %v", d)
var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if idx := getIndex(t, resp); idx <= out.Index {
errch <- fmt.Errorf("bad: %v", idx)
}
nodes := obj.(structs.Nodes)
if len(nodes) != 2 {
errch <- fmt.Errorf("bad: %v", obj)
}
errch <- nil
}()
// wait for both go routines to return
if err := <-errch; err != nil {
t.Fatal(err)
const waitDuration = 3 * time.Second
// Re-run the query, if errantly woken up with no change, resume blocking.
var elapsed time.Duration
RUN_BLOCKING_QUERY:
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/catalog/nodes?wait=%s&index=%d",
waitDuration.String(),
waitIndex), nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if err := <-errch; err != nil {
t.Fatal(err)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
elapsed = time.Since(start)
idx := getIndex(t, resp)
if idx < waitIndex {
t.Fatalf("bad: %v", idx)
} else if idx == waitIndex {
if elapsed > waitDuration {
// This should prevent the loop from running longer than the
// waitDuration
t.Fatalf("too slow: %v", elapsed)
}
goto RUN_BLOCKING_QUERY
}
// Should block at least 100ms before getting the changed results
if elapsed < 100*time.Millisecond {
t.Fatalf("too fast: %v", elapsed)
}
nodes := obj.(structs.Nodes)
if len(nodes) != 2 {
t.Fatalf("bad: %v", obj)
}
}
func TestCatalogNodes_DistanceSort(t *testing.T) {