Fix deadlock in inmemlayer (#11225)
Also tweak autopilot test timings to adapt to things running faster.
This commit is contained in:
parent
410e34326d
commit
df7404e67e
|
@ -108,13 +108,14 @@ func (l *InmemLayer) Listeners() []NetworkListener {
|
|||
// Dial implements NetworkLayer.
|
||||
func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Config) (*tls.Conn, error) {
|
||||
l.l.Lock()
|
||||
defer l.l.Unlock()
|
||||
connectionCh := l.connectionCh
|
||||
|
||||
if addr == l.addr {
|
||||
panic(fmt.Sprintf("%q attempted to dial itself", l.addr))
|
||||
}
|
||||
|
||||
peer, ok := l.peers[addr]
|
||||
l.l.Unlock()
|
||||
if !ok {
|
||||
return nil, errors.New("inmemlayer: no address found")
|
||||
}
|
||||
|
@ -128,9 +129,9 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con
|
|||
l.logger.Debug("dailing connection", "node", l.addr, "remote", addr, "alpn", alpn)
|
||||
}
|
||||
|
||||
if l.connectionCh != nil {
|
||||
if connectionCh != nil {
|
||||
select {
|
||||
case l.connectionCh <- &ConnectionInfo{
|
||||
case connectionCh <- &ConnectionInfo{
|
||||
Node: l.addr,
|
||||
Remote: addr,
|
||||
IsServer: false,
|
||||
|
@ -148,7 +149,9 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con
|
|||
|
||||
tlsConn := tls.Client(conn, tlsConfig)
|
||||
|
||||
l.l.Lock()
|
||||
l.clientConns[addr] = append(l.clientConns[addr], conn)
|
||||
l.l.Unlock()
|
||||
|
||||
return tlsConn, nil
|
||||
}
|
||||
|
@ -157,14 +160,15 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con
|
|||
// needs to be Accepted.
|
||||
func (l *InmemLayer) clientConn(addr string) (net.Conn, error) {
|
||||
l.l.Lock()
|
||||
defer l.l.Unlock()
|
||||
|
||||
if l.listener == nil {
|
||||
l.l.Unlock()
|
||||
return nil, errors.New("inmemlayer: listener not started")
|
||||
}
|
||||
|
||||
_, ok := l.peers[addr]
|
||||
if !ok {
|
||||
l.l.Unlock()
|
||||
return nil, errors.New("inmemlayer: no peer found")
|
||||
}
|
||||
|
||||
|
@ -174,13 +178,16 @@ func (l *InmemLayer) clientConn(addr string) (net.Conn, error) {
|
|||
servConn = newDelayedConn(servConn, l.readerDelay)
|
||||
|
||||
l.servConns[addr] = append(l.servConns[addr], servConn)
|
||||
connectionCh := l.connectionCh
|
||||
pendingConns := l.listener.pendingConns
|
||||
l.l.Unlock()
|
||||
|
||||
if l.logger.IsDebug() {
|
||||
l.logger.Debug("received connection", "node", l.addr, "remote", addr)
|
||||
}
|
||||
if l.connectionCh != nil {
|
||||
if connectionCh != nil {
|
||||
select {
|
||||
case l.connectionCh <- &ConnectionInfo{
|
||||
case connectionCh <- &ConnectionInfo{
|
||||
Node: l.addr,
|
||||
Remote: addr,
|
||||
IsServer: true,
|
||||
|
@ -191,7 +198,7 @@ func (l *InmemLayer) clientConn(addr string) (net.Conn, error) {
|
|||
}
|
||||
|
||||
select {
|
||||
case l.listener.pendingConns <- servConn:
|
||||
case pendingConns <- servConn:
|
||||
case <-time.After(2 * time.Second):
|
||||
return nil, errors.New("inmemlayer: timeout while accepting connection")
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -75,6 +76,7 @@ func TestRaft_Autopilot_Stabilization_And_State(t *testing.T) {
|
|||
cluster.UnsealCore(t, core)
|
||||
}
|
||||
joinFunc(core)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
state, err = client.Sys().RaftAutopilotState()
|
||||
require.NoError(t, err)
|
||||
|
@ -225,7 +227,7 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||
"snapshot_threshold": "50",
|
||||
"trailing_logs": "100",
|
||||
"autopilot_reconcile_interval": "1s",
|
||||
"snapshot_interval": "5s",
|
||||
"snapshot_interval": "1s",
|
||||
}
|
||||
if coreIdx == 2 {
|
||||
config["snapshot_delay"] = timeToHealthyCore2.String()
|
||||
|
@ -250,7 +252,7 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||
require.Equal(t, "leader", state.Servers["core-0"].Status)
|
||||
|
||||
_, err = client.Logical().Write("sys/storage/raft/autopilot/configuration", map[string]interface{}{
|
||||
"server_stabilization_time": "3s",
|
||||
"server_stabilization_time": "5s",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -284,22 +286,10 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||
cluster.UnsealCore(t, core)
|
||||
}
|
||||
|
||||
checkState := func(nodeID string, numServers int, allHealthy bool, healthy bool, suffrage string) {
|
||||
state, err = client.Sys().RaftAutopilotState()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, allHealthy, state.Healthy)
|
||||
require.Len(t, state.Servers, numServers)
|
||||
require.Equal(t, healthy, state.Servers[nodeID].Healthy)
|
||||
require.Equal(t, "alive", state.Servers[nodeID].NodeStatus)
|
||||
require.Equal(t, suffrage, state.Servers[nodeID].Status)
|
||||
}
|
||||
|
||||
joinFunc(cluster.Cores[1])
|
||||
checkState("core-1", 2, false, false, "non-voter")
|
||||
joinFunc(cluster.Cores[2])
|
||||
|
||||
core2shouldBeHealthyAt := time.Now().Add(timeToHealthyCore2)
|
||||
joinFunc(cluster.Cores[2])
|
||||
checkState("core-2", 3, false, false, "non-voter")
|
||||
|
||||
stabilizationWaitDuration := time.Duration(1.25 * float64(config.ServerStabilizationTime))
|
||||
deadline := time.Now().Add(stabilizationWaitDuration)
|
||||
|
@ -307,12 +297,12 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||
for time.Now().Before(deadline) {
|
||||
state, err := client.Sys().RaftAutopilotState()
|
||||
require.NoError(t, err)
|
||||
core1healthy = state.Servers["core-1"].Healthy
|
||||
core2healthy = state.Servers["core-2"].Healthy
|
||||
core1healthy = state.Servers["core-1"] != nil && state.Servers["core-1"].Healthy
|
||||
core2healthy = state.Servers["core-2"] != nil && state.Servers["core-2"].Healthy
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
if !core1healthy || core2healthy {
|
||||
t.Fatalf("expected health: core1=true and core2=false, got: core=%v, core2=%v", core1healthy, core2healthy)
|
||||
t.Fatalf("expected health: core1=true and core2=false, got: core1=%v, core2=%v", core1healthy, core2healthy)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second) // wait for reconciliation
|
||||
|
@ -338,7 +328,7 @@ func TestRaft_Autopilot_Stabilization_Delay(t *testing.T) {
|
|||
break
|
||||
}
|
||||
}
|
||||
require.Equal(t, state.Voters, []string{"core-0", "core-1", "core-2"})
|
||||
require.Equal(t, []string{"core-0", "core-1", "core-2"}, state.Voters)
|
||||
}
|
||||
|
||||
func TestRaft_AutoPilot_Peersets_Equivalent(t *testing.T) {
|
||||
|
@ -373,20 +363,28 @@ func TestRaft_AutoPilot_Peersets_Equivalent(t *testing.T) {
|
|||
joinFunc(cluster.Cores[1])
|
||||
joinFunc(cluster.Cores[2])
|
||||
|
||||
// Make sure all nodes have an equivalent configuration
|
||||
core0Peers, err := cluster.Cores[0].UnderlyingRawStorage.(*raft.RaftBackend).Peers(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
core1Peers, err := cluster.Cores[1].UnderlyingRawStorage.(*raft.RaftBackend).Peers(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
core2Peers, err := cluster.Cores[2].UnderlyingRawStorage.(*raft.RaftBackend).Peers(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
deadline := time.Now().Add(10 * time.Second)
|
||||
var core0Peers, core1Peers, core2Peers []raft.Peer
|
||||
for time.Now().Before(deadline) {
|
||||
// Make sure all nodes have an equivalent configuration
|
||||
core0Peers, err = cluster.Cores[0].UnderlyingRawStorage.(*raft.RaftBackend).Peers(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
core1Peers, err = cluster.Cores[1].UnderlyingRawStorage.(*raft.RaftBackend).Peers(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
core2Peers, err = cluster.Cores[2].UnderlyingRawStorage.(*raft.RaftBackend).Peers(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(core0Peers, core1Peers) && reflect.DeepEqual(core1Peers, core2Peers) {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
require.Equal(t, core0Peers, core1Peers)
|
||||
require.Equal(t, core1Peers, core2Peers)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue