Fix flaky TestCluster_ForwardRequest tests. (#9973)
We now wait to see that clients are aware of a new leader before we hit them, and we wait to see that the leader has setup the forwarding handler.
This commit is contained in:
parent
795b118941
commit
2cb1e03c1b
|
@ -218,7 +218,7 @@ func testCluster_ForwardRequestsCommon(t *testing.T, clusterOpts *TestClusterOpt
|
|||
root := cluster.RootToken
|
||||
|
||||
// Wait for core to become active
|
||||
TestWaitActive(t, cores[0].Core)
|
||||
TestWaitActiveForwardingReady(t, cores[0].Core)
|
||||
|
||||
// Test forwarding a request. Since we're going directly from core to core
|
||||
// with no fallback we know that if it worked, request handling is working
|
||||
|
@ -232,107 +232,78 @@ func testCluster_ForwardRequestsCommon(t *testing.T, clusterOpts *TestClusterOpt
|
|||
//
|
||||
|
||||
// Ensure active core is cores[1] and test
|
||||
err := cores[0].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
_ = cores[2].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
TestWaitActive(t, cores[1].Core)
|
||||
testCluster_ForwardRequests(t, cores[0], root, "core2")
|
||||
testCluster_ForwardRequests(t, cores[2], root, "core2")
|
||||
testCluster_Forwarding(t, cluster, 0, 1, root, "core2")
|
||||
|
||||
// Ensure active core is cores[2] and test
|
||||
err = cores[1].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
_ = cores[0].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
TestWaitActive(t, cores[2].Core)
|
||||
testCluster_ForwardRequests(t, cores[0], root, "core3")
|
||||
testCluster_ForwardRequests(t, cores[1], root, "core3")
|
||||
testCluster_Forwarding(t, cluster, 1, 2, root, "core3")
|
||||
|
||||
// Ensure active core is cores[0] and test
|
||||
err = cores[2].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
_ = cores[1].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
TestWaitActive(t, cores[0].Core)
|
||||
testCluster_ForwardRequests(t, cores[1], root, "core1")
|
||||
testCluster_ForwardRequests(t, cores[2], root, "core1")
|
||||
testCluster_Forwarding(t, cluster, 2, 0, root, "core1")
|
||||
|
||||
// Ensure active core is cores[1] and test
|
||||
err = cores[0].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
_ = cores[2].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
TestWaitActive(t, cores[1].Core)
|
||||
testCluster_ForwardRequests(t, cores[0], root, "core2")
|
||||
testCluster_ForwardRequests(t, cores[2], root, "core2")
|
||||
testCluster_Forwarding(t, cluster, 0, 1, root, "core2")
|
||||
|
||||
// Ensure active core is cores[2] and test
|
||||
err = cores[1].StepDown(context.Background(), &logical.Request{
|
||||
testCluster_Forwarding(t, cluster, 1, 2, root, "core3")
|
||||
}
|
||||
|
||||
func testCluster_Forwarding(t *testing.T, cluster *TestCluster, oldLeaderCoreIdx, newLeaderCoreIdx int, rootToken, remoteCoreID string) {
|
||||
t.Logf("new leaderidx will be %d, stepping down other cores to make it so", newLeaderCoreIdx)
|
||||
err := cluster.Cores[oldLeaderCoreIdx].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
ClientToken: rootToken,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
_ = cores[0].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: root,
|
||||
})
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
TestWaitActive(t, cores[2].Core)
|
||||
testCluster_ForwardRequests(t, cores[0], root, "core3")
|
||||
testCluster_ForwardRequests(t, cores[1], root, "core3")
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
if i != oldLeaderCoreIdx && i != newLeaderCoreIdx {
|
||||
_ = cluster.Cores[i].StepDown(context.Background(), &logical.Request{
|
||||
Operation: logical.UpdateOperation,
|
||||
Path: "sys/step-down",
|
||||
ClientToken: rootToken,
|
||||
})
|
||||
time.Sleep(clusterTestPausePeriod)
|
||||
}
|
||||
}
|
||||
|
||||
TestWaitActiveForwardingReady(t, cluster.Cores[newLeaderCoreIdx].Core)
|
||||
|
||||
deadline := time.Now().Add(5 * time.Second)
|
||||
var ready int
|
||||
for time.Now().Before(deadline) {
|
||||
for i := 0; i < 3; i++ {
|
||||
if i != newLeaderCoreIdx {
|
||||
leaderParams := cluster.Cores[i].clusterLeaderParams.Load().(*ClusterLeaderParams)
|
||||
if leaderParams != nil && leaderParams.LeaderClusterAddr == cluster.Cores[newLeaderCoreIdx].ClusterAddr() {
|
||||
ready++
|
||||
}
|
||||
}
|
||||
}
|
||||
if ready == 2 {
|
||||
break
|
||||
}
|
||||
ready = 0
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
if ready != 2 {
|
||||
t.Fatal("standbys have not discovered the new active node in time")
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
if i != newLeaderCoreIdx {
|
||||
testCluster_ForwardRequests(t, cluster.Cores[i], rootToken, remoteCoreID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testCluster_ForwardRequests(t *testing.T, c *TestClusterCore, rootToken, remoteCoreID string) {
|
||||
t.Helper()
|
||||
|
||||
standby, err := c.Standby()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -696,6 +696,19 @@ func TestWaitActive(t testing.T, core *Core) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestWaitActiveForwardingReady(t testing.T, core *Core) {
|
||||
TestWaitActive(t, core)
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if _, ok := core.getClusterListener().Handler(consts.RequestForwardingALPN); ok {
|
||||
return
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
t.Fatal("timed out waiting for request forwarding handler to be registered")
|
||||
}
|
||||
|
||||
func TestWaitActiveWithError(core *Core) error {
|
||||
start := time.Now()
|
||||
var standby bool
|
||||
|
|
Loading…
Reference in New Issue