2018-01-11 19:24:57 +00:00
package client
import (
2018-02-06 21:03:09 +00:00
"errors"
2018-01-11 21:23:57 +00:00
"io"
2018-01-11 19:24:57 +00:00
"net"
"net/rpc"
"strings"
2018-01-09 23:26:53 +00:00
"time"
2018-01-11 19:24:57 +00:00
2018-01-11 21:23:57 +00:00
metrics "github.com/armon/go-metrics"
2018-01-09 23:26:53 +00:00
"github.com/hashicorp/consul/lib"
2020-03-18 11:27:32 +00:00
"github.com/hashicorp/go-msgpack/codec"
2018-02-01 01:35:21 +00:00
"github.com/hashicorp/nomad/client/servers"
2018-01-19 00:51:49 +00:00
inmem "github.com/hashicorp/nomad/helper/codec"
2018-01-12 21:58:44 +00:00
"github.com/hashicorp/nomad/helper/pool"
2018-01-09 23:26:53 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2018-01-11 19:24:57 +00:00
)
// rpcEndpoints holds the RPC endpoints
type rpcEndpoints struct {
2020-04-02 20:04:56 +00:00
ClientStats * ClientStats
CSI * CSI
FileSystem * FileSystem
Allocations * Allocations
Agent * Agent
2018-01-11 19:24:57 +00:00
}
// ClientRPC is used to make a local, client only RPC call
func ( c * Client ) ClientRPC ( method string , args interface { } , reply interface { } ) error {
2018-01-19 00:51:49 +00:00
codec := & inmem . InmemCodec {
2018-01-11 19:24:57 +00:00
Method : method ,
Args : args ,
Reply : reply ,
}
if err := c . rpcServer . ServeRequest ( codec ) ; err != nil {
return err
}
return codec . Err
}
2018-01-31 21:49:21 +00:00
// StreamingRpcHandler is used to make a local, client only streaming RPC
2018-01-21 01:19:55 +00:00
// call.
2018-01-31 21:49:21 +00:00
func ( c * Client ) StreamingRpcHandler ( method string ) ( structs . StreamingRpcHandler , error ) {
2018-01-21 01:19:55 +00:00
return c . streamingRpcs . GetHandler ( method )
}
2018-01-11 19:24:57 +00:00
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func ( c * Client ) RPC ( method string , args interface { } , reply interface { } ) error {
// Invoke the RPCHandler if it exists
if c . config . RPCHandler != nil {
return c . config . RPCHandler . RPC ( method , args , reply )
}
2020-11-30 20:11:10 +00:00
// We will try to automatically retry requests that fail due to things like server unavailability
// but instead of retrying forever, lets have a solid upper-bound
deadline := time . Now ( )
// A reasonable amount of time for leader election. Note when servers forward() our RPC requests
// to the leader they may also allow for an RPCHoldTimeout while waiting for leader election.
// That's OK, we won't double up because we are using it here not as a sleep but
// as a hint to give up
deadline = deadline . Add ( c . config . RPCHoldTimeout )
// If its a blocking query, allow the time specified by the request
if info , ok := args . ( structs . RPCInfo ) ; ok {
deadline = deadline . Add ( info . TimeToBlock ( ) )
}
2018-01-09 23:26:53 +00:00
TRY :
server := c . servers . FindServer ( )
if server == nil {
2018-01-11 19:24:57 +00:00
return noServersErr
}
2018-01-09 23:26:53 +00:00
// Make the request.
2021-12-01 22:36:02 +00:00
rpcErr := c . connPool . RPC ( c . Region ( ) , server . Addr , method , args , reply )
2020-11-30 20:11:10 +00:00
2018-01-09 23:26:53 +00:00
if rpcErr == nil {
2018-04-04 01:05:28 +00:00
c . fireRpcRetryWatcher ( )
2018-01-11 19:24:57 +00:00
return nil
}
2018-11-15 00:08:17 +00:00
// If shutting down, exit without logging the error
select {
case <- c . shutdownCh :
return nil
default :
}
2018-01-09 23:26:53 +00:00
// Move off to another server, and see if we can retry.
2018-08-29 22:05:03 +00:00
c . rpcLogger . Error ( "error performing RPC to server" , "error" , rpcErr , "rpc" , method , "server" , server . Addr )
2018-01-09 23:26:53 +00:00
c . servers . NotifyFailedServer ( server )
2020-11-30 20:11:10 +00:00
if ! canRetry ( args , rpcErr ) {
c . rpcLogger . Error ( "error performing RPC to server which is not safe to automatically retry" , "error" , rpcErr , "rpc" , method , "server" , server . Addr )
return rpcErr
}
if time . Now ( ) . After ( deadline ) {
// Blocking queries are tricky. jitters and rpcholdtimes in multiple places can result in our server call taking longer than we wanted it to. For example:
// a block time of 5s may easily turn into the server blocking for 10s since it applies its own RPCHoldTime. If the server dies at t=7s we still want to retry
// so before we give up on blocking queries make one last attempt for an immediate answer
if info , ok := args . ( structs . RPCInfo ) ; ok && info . TimeToBlock ( ) > 0 {
info . SetTimeToBlock ( 0 )
return c . RPC ( method , args , reply )
}
c . rpcLogger . Error ( "error performing RPC to server, deadline exceeded, cannot retry" , "error" , rpcErr , "rpc" , method , "server" , server . Addr )
2018-01-09 23:26:53 +00:00
return rpcErr
}
2020-11-30 20:11:10 +00:00
// Wait to avoid thundering herd
2020-09-18 12:58:41 +00:00
select {
2020-11-30 20:11:10 +00:00
case <- time . After ( lib . RandomStagger ( c . config . RPCHoldTimeout / structs . JitterFraction ) ) :
// If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline.
if info , ok := args . ( structs . RPCInfo ) ; ok && info . TimeToBlock ( ) > 0 {
2020-12-09 19:05:18 +00:00
newBlockTime := time . Until ( deadline )
2020-11-30 20:11:10 +00:00
// We can get below 0 here on slow computers because we slept for jitter so at least try to get an immediate response
if newBlockTime < 0 {
newBlockTime = 0
}
info . SetTimeToBlock ( newBlockTime )
return c . RPC ( method , args , reply )
}
2020-09-18 12:58:41 +00:00
goto TRY
case <- c . shutdownCh :
2018-01-09 23:26:53 +00:00
}
return rpcErr
}
// canRetry returns true if the given situation is safe for a retry.
2020-11-30 20:11:10 +00:00
func canRetry ( args interface { } , err error ) bool {
2018-01-09 23:26:53 +00:00
// No leader errors are always safe to retry since no state could have
// been changed.
if structs . IsErrNoLeader ( err ) {
return true
}
// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info , ok := args . ( structs . RPCInfo )
2020-11-30 20:11:10 +00:00
if ok && info . IsRead ( ) && lib . IsErrEOF ( err ) {
2018-01-09 23:26:53 +00:00
return true
}
return false
2018-01-11 19:24:57 +00:00
}
2018-02-01 01:35:21 +00:00
// RemoteStreamingRpcHandler is used to make a streaming RPC call to a remote
// server.
func ( c * Client ) RemoteStreamingRpcHandler ( method string ) ( structs . StreamingRpcHandler , error ) {
server := c . servers . FindServer ( )
if server == nil {
return nil , noServersErr
}
conn , err := c . streamingRpcConn ( server , method )
if err != nil {
// Move off to another server
2018-08-29 22:05:03 +00:00
c . rpcLogger . Error ( "error performing RPC to server" , "error" , err , "rpc" , method , "server" , server . Addr )
2018-02-01 01:35:21 +00:00
c . servers . NotifyFailedServer ( server )
return nil , err
}
return bridgedStreamingRpcHandler ( conn ) , nil
}
// bridgedStreamingRpcHandler creates a bridged streaming RPC handler by copying
// data between the two sides.
func bridgedStreamingRpcHandler ( sideA io . ReadWriteCloser ) structs . StreamingRpcHandler {
return func ( sideB io . ReadWriteCloser ) {
defer sideA . Close ( )
defer sideB . Close ( )
structs . Bridge ( sideA , sideB )
}
}
// streamingRpcConn is used to retrieve a connection to a server to conduct a
// streaming RPC.
func ( c * Client ) streamingRpcConn ( server * servers . Server , method string ) ( net . Conn , error ) {
// Dial the server
conn , err := net . DialTimeout ( "tcp" , server . Addr . String ( ) , 10 * time . Second )
if err != nil {
return nil , err
}
// Cast to TCPConn
if tcp , ok := conn . ( * net . TCPConn ) ; ok {
tcp . SetKeepAlive ( true )
tcp . SetNoDelay ( true )
}
// Check if TLS is enabled
2018-02-15 23:22:57 +00:00
c . tlsWrapLock . RLock ( )
tlsWrap := c . tlsWrap
c . tlsWrapLock . RUnlock ( )
if tlsWrap != nil {
// Switch the connection into TLS mode
if _ , err := conn . Write ( [ ] byte { byte ( pool . RpcTLS ) } ) ; err != nil {
conn . Close ( )
return nil , err
}
// Wrap the connection in a TLS client
tlsConn , err := tlsWrap ( c . Region ( ) , conn )
if err != nil {
conn . Close ( )
return nil , err
}
conn = tlsConn
}
2018-02-01 01:35:21 +00:00
// Write the multiplex byte to set the mode
if _ , err := conn . Write ( [ ] byte { byte ( pool . RpcStreaming ) } ) ; err != nil {
conn . Close ( )
return nil , err
}
// Send the header
encoder := codec . NewEncoder ( conn , structs . MsgpackHandle )
2018-02-06 21:03:09 +00:00
decoder := codec . NewDecoder ( conn , structs . MsgpackHandle )
2018-02-01 01:35:21 +00:00
header := structs . StreamingRpcHeader {
Method : method ,
}
if err := encoder . Encode ( header ) ; err != nil {
conn . Close ( )
return nil , err
}
2018-02-06 21:03:09 +00:00
// Wait for the acknowledgement
var ack structs . StreamingRpcAck
if err := decoder . Decode ( & ack ) ; err != nil {
conn . Close ( )
return nil , err
}
if ack . Error != "" {
conn . Close ( )
return nil , errors . New ( ack . Error )
}
2018-02-01 01:35:21 +00:00
return conn , nil
}
2018-01-11 19:24:57 +00:00
// setupClientRpc is used to setup the Client's RPC endpoints
2021-03-19 14:52:43 +00:00
func ( c * Client ) setupClientRpc ( rpcs map [ string ] interface { } ) {
2018-01-11 19:24:57 +00:00
// Create the RPC Server
c . rpcServer = rpc . NewServer ( )
2021-03-19 14:52:43 +00:00
// Initialize the RPC handlers
if rpcs != nil {
// override RPCs
for name , rpc := range rpcs {
c . rpcServer . RegisterName ( name , rpc )
}
} else {
c . endpoints . ClientStats = & ClientStats { c }
c . endpoints . CSI = & CSI { c }
c . endpoints . FileSystem = NewFileSystemEndpoint ( c )
c . endpoints . Allocations = NewAllocationsEndpoint ( c )
c . endpoints . Agent = NewAgentEndpoint ( c )
c . setupClientRpcServer ( c . rpcServer )
}
2018-01-11 21:23:57 +00:00
go c . rpcConnListener ( )
}
2018-02-06 00:16:20 +00:00
// setupClientRpcServer is used to populate a client RPC server with endpoints.
func ( c * Client ) setupClientRpcServer ( server * rpc . Server ) {
// Register the endpoints
server . Register ( c . endpoints . ClientStats )
2020-04-02 20:04:56 +00:00
server . Register ( c . endpoints . CSI )
2018-02-06 00:16:20 +00:00
server . Register ( c . endpoints . FileSystem )
server . Register ( c . endpoints . Allocations )
2019-12-04 13:36:12 +00:00
server . Register ( c . endpoints . Agent )
2018-02-06 00:16:20 +00:00
}
2018-01-11 21:23:57 +00:00
// rpcConnListener is a long lived function that listens for new connections
// being made on the connection pool and starts an RPC listener for each
// connection.
func ( c * Client ) rpcConnListener ( ) {
// Make a channel for new connections.
2021-06-07 14:22:37 +00:00
conns := make ( chan * pool . Conn , 4 )
2018-01-11 21:23:57 +00:00
c . connPool . SetConnListener ( conns )
for {
select {
case <- c . shutdownCh :
return
2021-06-07 14:22:37 +00:00
case conn , ok := <- conns :
2018-01-11 21:23:57 +00:00
if ! ok {
continue
}
2021-06-07 14:22:37 +00:00
go c . listenConn ( conn )
2018-01-11 21:23:57 +00:00
}
}
}
// listenConn is used to listen for connections being made from the server on
// pre-existing connection. This should be called in a goroutine.
2021-06-07 14:22:37 +00:00
func ( c * Client ) listenConn ( conn * pool . Conn ) {
2018-01-11 21:23:57 +00:00
for {
2021-06-07 14:22:37 +00:00
stream , err := conn . AcceptStream ( )
2018-01-11 21:23:57 +00:00
if err != nil {
2021-06-07 14:22:37 +00:00
if conn . IsClosed ( ) {
2018-01-11 21:23:57 +00:00
return
}
2019-01-09 14:22:47 +00:00
c . rpcLogger . Error ( "failed to accept RPC conn" , "error" , err )
2018-01-11 21:23:57 +00:00
continue
}
2021-06-07 14:22:37 +00:00
go c . handleConn ( stream )
2018-01-11 21:23:57 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" , "accept_conn" } , 1 )
}
}
2018-01-19 00:51:49 +00:00
// handleConn is used to determine if this is a RPC or Streaming RPC connection and
// invoke the correct handler
2018-01-11 21:23:57 +00:00
func ( c * Client ) handleConn ( conn net . Conn ) {
2018-01-19 00:51:49 +00:00
// Read a single byte
buf := make ( [ ] byte , 1 )
if _ , err := conn . Read ( buf ) ; err != nil {
if err != io . EOF {
2018-08-29 22:05:03 +00:00
c . rpcLogger . Error ( "error reading byte" , "error" , err )
2018-01-19 00:51:49 +00:00
}
conn . Close ( )
return
}
// Switch on the byte
switch pool . RPCType ( buf [ 0 ] ) {
case pool . RpcNomad :
c . handleNomadConn ( conn )
case pool . RpcStreaming :
c . handleStreamingConn ( conn )
default :
2018-08-29 22:05:03 +00:00
c . rpcLogger . Error ( "unrecognized RPC byte" , "byte" , buf [ 0 ] )
2018-01-19 00:51:49 +00:00
conn . Close ( )
return
}
}
// handleNomadConn is used to handle a single Nomad RPC connection.
func ( c * Client ) handleNomadConn ( conn net . Conn ) {
2018-01-11 21:23:57 +00:00
defer conn . Close ( )
2018-01-12 21:58:44 +00:00
rpcCodec := pool . NewServerCodec ( conn )
2018-01-11 21:23:57 +00:00
for {
select {
case <- c . shutdownCh :
return
default :
}
if err := c . rpcServer . ServeRequest ( rpcCodec ) ; err != nil {
if err != io . EOF && ! strings . Contains ( err . Error ( ) , "closed" ) {
2018-08-29 22:05:03 +00:00
c . rpcLogger . Error ( "error performing RPC" , "error" , err , "addr" , conn . RemoteAddr ( ) )
2018-01-11 21:23:57 +00:00
metrics . IncrCounter ( [ ] string { "client" , "rpc" , "request_error" } , 1 )
}
return
}
metrics . IncrCounter ( [ ] string { "client" , "rpc" , "request" } , 1 )
}
2018-01-11 19:24:57 +00:00
}
2018-01-19 00:51:49 +00:00
// handleStreamingConn is used to handle a single Streaming Nomad RPC connection.
func ( c * Client ) handleStreamingConn ( conn net . Conn ) {
defer conn . Close ( )
// Decode the header
var header structs . StreamingRpcHeader
decoder := codec . NewDecoder ( conn , structs . MsgpackHandle )
if err := decoder . Decode ( & header ) ; err != nil {
if err != io . EOF && ! strings . Contains ( err . Error ( ) , "closed" ) {
2018-08-29 22:05:03 +00:00
c . rpcLogger . Error ( "error performing streaming RPC" , "error" , err , "addr" , conn . RemoteAddr ( ) )
2018-01-19 00:51:49 +00:00
metrics . IncrCounter ( [ ] string { "client" , "streaming_rpc" , "request_error" } , 1 )
}
return
}
2018-02-06 21:03:09 +00:00
ack := structs . StreamingRpcAck { }
2018-01-19 00:51:49 +00:00
handler , err := c . streamingRpcs . GetHandler ( header . Method )
if err != nil {
2018-08-29 22:05:03 +00:00
c . rpcLogger . Error ( "streaming RPC error" , "addr" , conn . RemoteAddr ( ) , "error" , err )
2018-01-19 00:51:49 +00:00
metrics . IncrCounter ( [ ] string { "client" , "streaming_rpc" , "request_error" } , 1 )
2018-02-06 21:03:09 +00:00
ack . Error = err . Error ( )
}
// Send the acknowledgement
encoder := codec . NewEncoder ( conn , structs . MsgpackHandle )
if err := encoder . Encode ( ack ) ; err != nil {
conn . Close ( )
return
}
if ack . Error != "" {
2018-01-19 00:51:49 +00:00
return
}
// Invoke the handler
metrics . IncrCounter ( [ ] string { "client" , "streaming_rpc" , "request" } , 1 )
handler ( conn )
}
2018-01-11 19:24:57 +00:00
// resolveServer given a sever's address as a string, return it's resolved
// net.Addr or an error.
func resolveServer ( s string ) ( net . Addr , error ) {
const defaultClientPort = "4647" // default client RPC port
host , port , err := net . SplitHostPort ( s )
if err != nil {
if strings . Contains ( err . Error ( ) , "missing port" ) {
host = s
port = defaultClientPort
} else {
return nil , err
}
}
return net . ResolveTCPAddr ( "tcp" , net . JoinHostPort ( host , port ) )
}
2018-01-09 23:26:53 +00:00
// Ping is used to ping a particular server and returns whether it is healthy or
// a potential error.
2018-01-25 02:00:21 +00:00
func ( c * Client ) Ping ( srv net . Addr ) error {
2018-01-09 23:26:53 +00:00
var reply struct { }
2021-12-01 22:36:02 +00:00
err := c . connPool . RPC ( c . Region ( ) , srv , "Status.Ping" , struct { } { } , & reply )
2018-01-25 02:00:21 +00:00
return err
2018-01-09 23:26:53 +00:00
}
2018-04-04 01:05:28 +00:00
// rpcRetryWatcher returns a channel that will be closed if an event happens
// such that we expect the next RPC to be successful.
func ( c * Client ) rpcRetryWatcher ( ) <- chan struct { } {
c . rpcRetryLock . Lock ( )
defer c . rpcRetryLock . Unlock ( )
if c . rpcRetryCh == nil {
c . rpcRetryCh = make ( chan struct { } )
}
return c . rpcRetryCh
}
// fireRpcRetryWatcher causes any RPC retryloops to retry their RPCs because we
// believe the will be successful.
func ( c * Client ) fireRpcRetryWatcher ( ) {
c . rpcRetryLock . Lock ( )
defer c . rpcRetryLock . Unlock ( )
if c . rpcRetryCh != nil {
close ( c . rpcRetryCh )
c . rpcRetryCh = nil
}
}