rpc: use tls wrapped connection for streaming rpc
This ensures that server-to-server streaming RPC calls use the tls wrapped connections. Prior to this, `streamingRpcImpl` function uses tls for setting header and invoking the rpc method, but returns unwrapped tls connection. Thus, streaming writes fail with tls errors. This tls streaming bug existed since 0.8.0[1], but PR #5654[2] exacerbated it in 0.9.2. Prior to PR #5654, nomad client used to shuffle servers at every heartbeat -- `servers.Manager.setServers`[3] always shuffled servers and was called by heartbeat code[4]. Shuffling servers meant that a nomad client would heartbeat and establish a connection against all nomad servers eventually. When handling streaming RPC calls, nomad servers used these local connection to communicate directly to the client. The server-to-server forwarding logic was left mostly unexercised. PR #5654 means that a nomad client may connect to a single server only and caused the server-to-server forward streaming RPC code to get exercised more and unearthed the problem. [1] https://github.com/hashicorp/nomad/blob/v0.8.0/nomad/rpc.go#L501-L515 [2] https://github.com/hashicorp/nomad/pull/5654 [3] https://github.com/hashicorp/nomad/blob/v0.9.1/client/servers/manager.go#L198-L216 [4] https://github.com/hashicorp/nomad/blob/v0.9.1/client/client.go#L1603
This commit is contained in:
parent
9c9bec62fd
commit
ad39bcef60
22
nomad/rpc.go
22
nomad/rpc.go
|
@ -540,18 +540,14 @@ func (r *rpcHandler) streamingRpc(server *serverParts, method string) (net.Conn,
|
||||||
tcp.SetNoDelay(true)
|
tcp.SetNoDelay(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.streamingRpcImpl(conn, server.Region, method); err != nil {
|
return r.streamingRpcImpl(conn, server.Region, method)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return conn, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// streamingRpcImpl takes a pre-established connection to a server and conducts
|
// streamingRpcImpl takes a pre-established connection to a server and conducts
|
||||||
// the handshake to establish a streaming RPC for the given method. If an error
|
// the handshake to establish a streaming RPC for the given method. If an error
|
||||||
// is returned, the underlying connection has been closed. Otherwise it is
|
// is returned, the underlying connection has been closed. Otherwise it is
|
||||||
// assumed that the connection has been hijacked by the RPC method.
|
// assumed that the connection has been hijacked by the RPC method.
|
||||||
func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) error {
|
func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) (net.Conn, error) {
|
||||||
// Check if TLS is enabled
|
// Check if TLS is enabled
|
||||||
r.tlsWrapLock.RLock()
|
r.tlsWrapLock.RLock()
|
||||||
tlsWrap := r.tlsWrap
|
tlsWrap := r.tlsWrap
|
||||||
|
@ -561,14 +557,14 @@ func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) erro
|
||||||
// Switch the connection into TLS mode
|
// Switch the connection into TLS mode
|
||||||
if _, err := conn.Write([]byte{byte(pool.RpcTLS)}); err != nil {
|
if _, err := conn.Write([]byte{byte(pool.RpcTLS)}); err != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrap the connection in a TLS client
|
// Wrap the connection in a TLS client
|
||||||
tlsConn, err := tlsWrap(region, conn)
|
tlsConn, err := tlsWrap(region, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
conn = tlsConn
|
conn = tlsConn
|
||||||
}
|
}
|
||||||
|
@ -576,7 +572,7 @@ func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) erro
|
||||||
// Write the multiplex byte to set the mode
|
// Write the multiplex byte to set the mode
|
||||||
if _, err := conn.Write([]byte{byte(pool.RpcStreaming)}); err != nil {
|
if _, err := conn.Write([]byte{byte(pool.RpcStreaming)}); err != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the header
|
// Send the header
|
||||||
|
@ -587,22 +583,22 @@ func (r *rpcHandler) streamingRpcImpl(conn net.Conn, region, method string) erro
|
||||||
}
|
}
|
||||||
if err := encoder.Encode(header); err != nil {
|
if err := encoder.Encode(header); err != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the acknowledgement
|
// Wait for the acknowledgement
|
||||||
var ack structs.StreamingRpcAck
|
var ack structs.StreamingRpcAck
|
||||||
if err := decoder.Decode(&ack); err != nil {
|
if err := decoder.Decode(&ack); err != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if ack.Error != "" {
|
if ack.Error != "" {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return errors.New(ack.Error)
|
return nil, errors.New(ack.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.
|
// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.
|
||||||
|
|
|
@ -453,7 +453,7 @@ func TestRPC_handleMultiplexV2(t *testing.T) {
|
||||||
require.NotEmpty(l)
|
require.NotEmpty(l)
|
||||||
|
|
||||||
// Make a streaming RPC
|
// Make a streaming RPC
|
||||||
err = s.streamingRpcImpl(s2, s.Region(), "Bogus")
|
_, err = s.streamingRpcImpl(s2, s.Region(), "Bogus")
|
||||||
require.NotNil(err)
|
require.NotNil(err)
|
||||||
require.Contains(err.Error(), "Bogus")
|
require.Contains(err.Error(), "Bogus")
|
||||||
require.True(structs.IsErrUnknownMethod(err))
|
require.True(structs.IsErrUnknownMethod(err))
|
||||||
|
|
Loading…
Reference in a new issue