backport of commit c67242463c239215a1dbf3b9979787a5f8359bbf (#20830)
Co-authored-by: Nick Cabatoff <ncabatoff@hashicorp.com>
This commit is contained in:
parent
30a01e12e6
commit
337d13cc3a
|
@ -0,0 +1,3 @@
|
|||
```release-note:change
|
||||
core: Revert #19676 (VAULT_GRPC_MIN_CONNECT_TIMEOUT env var) as we decided it was unnecessary.
|
||||
```
|
|
@ -330,8 +330,7 @@ func (c *Core) startClusterListener(ctx context.Context) error {
|
|||
c.clusterListener.Store(cluster.NewListener(networkLayer,
|
||||
c.clusterCipherSuites,
|
||||
listenerLogger,
|
||||
5*c.clusterHeartbeatInterval,
|
||||
c.grpcMinConnectTimeout))
|
||||
5*c.clusterHeartbeatInterval))
|
||||
|
||||
c.AddLogger(listenerLogger)
|
||||
|
||||
|
|
|
@ -75,10 +75,9 @@ type Listener struct {
|
|||
logger log.Logger
|
||||
l sync.RWMutex
|
||||
tlsConnectionLoggingLevel log.Level
|
||||
grpcMinConnectTimeout time.Duration
|
||||
}
|
||||
|
||||
func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout, grpcMinConnectTimeout time.Duration) *Listener {
|
||||
func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Logger, idleTimeout time.Duration) *Listener {
|
||||
var maxStreams uint32 = math.MaxUint32
|
||||
if override := os.Getenv("VAULT_GRPC_MAX_STREAMS"); override != "" {
|
||||
i, err := strconv.ParseUint(override, 10, 32)
|
||||
|
@ -115,7 +114,6 @@ func NewListener(networkLayer NetworkLayer, cipherSuites []uint16, logger log.Lo
|
|||
cipherSuites: cipherSuites,
|
||||
logger: logger,
|
||||
tlsConnectionLoggingLevel: log.LevelFromString(os.Getenv("VAULT_CLUSTER_TLS_SESSION_LOG_LEVEL")),
|
||||
grpcMinConnectTimeout: grpcMinConnectTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -466,21 +464,10 @@ func (cl *Listener) GetDialerFunc(ctx context.Context, alpn string) func(string,
|
|||
}
|
||||
|
||||
tlsConfig.NextProtos = []string{alpn}
|
||||
args := []interface{}{
|
||||
"address", addr,
|
||||
"alpn", alpn,
|
||||
"host", tlsConfig.ServerName,
|
||||
"timeout", fmt.Sprintf("%s", timeout),
|
||||
}
|
||||
if cl.grpcMinConnectTimeout != 0 {
|
||||
args = append(args, "timeout_env_override", fmt.Sprintf("%s", cl.grpcMinConnectTimeout))
|
||||
}
|
||||
cl.logger.Debug("creating rpc dialer", args...)
|
||||
cl.logger.Debug("creating rpc dialer", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName)
|
||||
|
||||
start := time.Now()
|
||||
conn, err := cl.networkLayer.Dial(addr, timeout, tlsConfig)
|
||||
if err != nil {
|
||||
cl.logger.Debug("dial failure", "address", addr, "alpn", alpn, "host", tlsConfig.ServerName, "duration", fmt.Sprintf("%s", time.Since(start)), "error", err)
|
||||
return nil, err
|
||||
}
|
||||
cl.logTLSSessionStart(conn.RemoteAddr().String(), conn.ConnectionState())
|
||||
|
|
|
@ -695,9 +695,6 @@ type Core struct {
|
|||
// if populated, the callback is called for every request
|
||||
// for testing purposes
|
||||
requestResponseCallback func(logical.Backend, *logical.Request, *logical.Response)
|
||||
|
||||
// if populated, override the default gRPC min connect timeout (currently 20s in grpc 1.51)
|
||||
grpcMinConnectTimeout time.Duration
|
||||
}
|
||||
|
||||
// c.stateLock needs to be held in read mode before calling this function.
|
||||
|
@ -1282,16 +1279,6 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
c.events.Start()
|
||||
}
|
||||
|
||||
minConnectTimeoutRaw := os.Getenv("VAULT_GRPC_MIN_CONNECT_TIMEOUT")
|
||||
if minConnectTimeoutRaw != "" {
|
||||
dur, err := time.ParseDuration(minConnectTimeoutRaw)
|
||||
if err != nil {
|
||||
c.logger.Warn("VAULT_GRPC_MIN_CONNECT_TIMEOUT contains non-duration value, ignoring")
|
||||
} else if dur != 0 {
|
||||
c.grpcMinConnectTimeout = dur
|
||||
}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/hashicorp/vault/vault/replication"
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
|
@ -279,8 +278,7 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
|
|||
// ALPN header right. It's just "insecure" because GRPC isn't managing
|
||||
// the TLS state.
|
||||
dctx, cancelFunc := context.WithCancel(ctx)
|
||||
|
||||
opts := []grpc.DialOption{
|
||||
c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host,
|
||||
grpc.WithDialer(clusterListener.GetDialerFunc(ctx, consts.RequestForwardingALPN)),
|
||||
grpc.WithInsecure(), // it's not, we handle it in the dialer
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
|
@ -289,15 +287,7 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
|
|||
grpc.WithDefaultCallOptions(
|
||||
grpc.MaxCallRecvMsgSize(math.MaxInt32),
|
||||
grpc.MaxCallSendMsgSize(math.MaxInt32),
|
||||
),
|
||||
}
|
||||
if c.grpcMinConnectTimeout != 0 {
|
||||
opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{
|
||||
MinConnectTimeout: c.grpcMinConnectTimeout,
|
||||
Backoff: backoff.DefaultConfig,
|
||||
}))
|
||||
}
|
||||
c.rpcClientConn, err = grpc.DialContext(dctx, clusterURL.Host, opts...)
|
||||
))
|
||||
if err != nil {
|
||||
cancelFunc()
|
||||
c.logger.Error("err setting up forwarding rpc client", "error", err)
|
||||
|
|
Loading…
Reference in New Issue