Eliminate global that caused race tests to fail in ent with an internal config setting. (#9604)

This commit is contained in:
ncabatoff 2020-07-27 16:10:26 -04:00 committed by GitHub
parent a837322897
commit 003bccd16e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 52 deletions

View File

@ -311,7 +311,10 @@ func (c *Core) startClusterListener(ctx context.Context) error {
networkLayer = cluster.NewTCPLayer(c.clusterListenerAddrs, c.logger.Named("cluster-listener.tcp"))
}
c.clusterListener.Store(cluster.NewListener(networkLayer, c.clusterCipherSuites, c.logger.Named("cluster-listener")))
c.clusterListener.Store(cluster.NewListener(networkLayer,
c.clusterCipherSuites,
c.logger.Named("cluster-listener"),
5*c.clusterHeartbeatInterval))
err := c.getClusterListener().Run(ctx)
if err != nil {

View File

@ -18,11 +18,6 @@ import (
"golang.org/x/net/http2"
)
var (
// Making this a package var allows tests to modify
HeartbeatInterval = 5 * time.Second
)
const (
ListenerAcceptDeadline = 500 * time.Millisecond
)
@ -73,7 +68,7 @@ type Listener struct {
l sync.RWMutex
}
func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger) *Listener {
func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener {
// Create the HTTP/2 server that will be shared by both RPC and regular
// duties. Doing it this way instead of listening via the server and gRPC
// allows us to re-use the same port via ALPN. We can just tell the server
@ -81,7 +76,7 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo
h2Server := &http2.Server{
// Our forwarding connections heartbeat regularly so anything else we
// want to go away/get cleaned up pretty rapidly
IdleTimeout: 5 * HeartbeatInterval,
IdleTimeout: idleTimeout,
}
return &Listener{

View File

@ -526,6 +526,8 @@ type Core struct {
PR1103disabled bool
quotaManager *quotas.Manager
clusterHeartbeatInterval time.Duration
}
// CoreConfig is used to parameterize a core
@ -615,44 +617,8 @@ type CoreConfig struct {
RecoveryMode bool
ClusterNetworkLayer cluster.NetworkLayer
}
func (c *CoreConfig) Clone() *CoreConfig {
return &CoreConfig{
DevToken: c.DevToken,
LogicalBackends: c.LogicalBackends,
CredentialBackends: c.CredentialBackends,
AuditBackends: c.AuditBackends,
Physical: c.Physical,
HAPhysical: c.HAPhysical,
ServiceRegistration: c.ServiceRegistration,
Seal: c.Seal,
Logger: c.Logger,
DisableCache: c.DisableCache,
DisableMlock: c.DisableMlock,
CacheSize: c.CacheSize,
StorageType: c.StorageType,
RedirectAddr: c.RedirectAddr,
ClusterAddr: c.ClusterAddr,
DefaultLeaseTTL: c.DefaultLeaseTTL,
MaxLeaseTTL: c.MaxLeaseTTL,
ClusterName: c.ClusterName,
ClusterCipherSuites: c.ClusterCipherSuites,
EnableUI: c.EnableUI,
EnableRaw: c.EnableRaw,
PluginDirectory: c.PluginDirectory,
DisableSealWrap: c.DisableSealWrap,
ReloadFuncs: c.ReloadFuncs,
ReloadFuncsLock: c.ReloadFuncsLock,
LicensingConfig: c.LicensingConfig,
DevLicenseDuration: c.DevLicenseDuration,
DisablePerformanceStandby: c.DisablePerformanceStandby,
DisableIndexing: c.DisableIndexing,
AllLoggers: c.AllLoggers,
CounterSyncInterval: c.CounterSyncInterval,
ClusterNetworkLayer: c.ClusterNetworkLayer,
entCoreConfig: c.entCoreConfig.Clone(),
}
ClusterHeartbeatInterval time.Duration
}
// GetServiceRegistration returns the config's ServiceRegistration, or nil if it does
@ -730,6 +696,11 @@ func NewCore(conf *CoreConfig) (*Core, error) {
conf.SecureRandomReader = rand.Reader
}
clusterHeartbeatInterval := conf.ClusterHeartbeatInterval
if clusterHeartbeatInterval == 0 {
clusterHeartbeatInterval = 5 * time.Second
}
// Setup the core
c := &Core{
entCore: entCore{},
@ -756,7 +727,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
cachingDisabled: conf.DisableCache,
clusterName: conf.ClusterName,
clusterNetworkLayer: conf.ClusterNetworkLayer,
clusterPeerClusterAddrsCache: cache.New(3*cluster.HeartbeatInterval, time.Second),
clusterPeerClusterAddrsCache: cache.New(3*clusterHeartbeatInterval, time.Second),
enableMlock: !conf.DisableMlock,
rawEnabled: conf.EnableRaw,
shutdownDoneCh: make(chan struct{}),
@ -783,9 +754,10 @@ func NewCore(conf *CoreConfig) (*Core, error) {
requests: new(uint64),
syncInterval: syncInterval,
},
recoveryMode: conf.RecoveryMode,
postUnsealStarted: new(uint32),
raftJoinDoneCh: make(chan struct{}),
recoveryMode: conf.RecoveryMode,
postUnsealStarted: new(uint32),
raftJoinDoneCh: make(chan struct{}),
clusterHeartbeatInterval: clusterHeartbeatInterval,
}
c.standbyStopCh.Store(make(chan struct{}))

View File

@ -46,7 +46,7 @@ func NewRequestForwardingHandler(c *Core, fws *http2.Server, perfStandbySlots ch
fwRPCServer := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 2 * cluster.HeartbeatInterval,
Time: 2 * c.clusterHeartbeatInterval,
}),
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
@ -279,7 +279,7 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
grpc.WithDialer(clusterListener.GetDialerFunc(ctx, consts.RequestForwardingALPN)),
grpc.WithInsecure(), // it's not, we handle it in the dialer
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 2 * cluster.HeartbeatInterval,
Time: 2 * c.clusterHeartbeatInterval,
}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
@ -295,7 +295,7 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
c.rpcForwardingClient = &forwardingClient{
RequestForwardingClient: NewRequestForwardingClient(c.rpcClientConn),
core: c,
echoTicker: time.NewTicker(cluster.HeartbeatInterval),
echoTicker: time.NewTicker(c.clusterHeartbeatInterval),
echoContext: dctx,
}
c.rpcForwardingClient.startHeartbeat()

View File

@ -1447,6 +1447,11 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te
coreConfig.RecoveryMode = base.RecoveryMode
}
if coreConfig.ClusterHeartbeatInterval == 0 {
// Set this lower so that state populates quickly to standby nodes
coreConfig.ClusterHeartbeatInterval = 2 * time.Second
}
if coreConfig.RawConfig == nil {
c := new(server.Config)
c.SharedConfig = &configutil.SharedConfig{LogFormat: logging.UnspecifiedFormat.String()}