open-nomad/nomad/client_rpc.go

299 lines
8.2 KiB
Go
Raw Normal View History

2018-01-15 22:48:53 +00:00
package nomad
import (
"errors"
2018-01-15 22:48:53 +00:00
"fmt"
2018-01-27 01:05:38 +00:00
"net"
2018-01-15 22:48:53 +00:00
"time"
"github.com/hashicorp/go-msgpack/codec"
2018-01-15 22:48:53 +00:00
multierror "github.com/hashicorp/go-multierror"
2018-01-27 01:05:38 +00:00
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper/pool"
2018-01-15 22:48:53 +00:00
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
)
// nodeConnState is used to track connection information about a Nomad Client.
type nodeConnState struct {
// Session holds the multiplexed yamux Session for dialing back.
Session *yamux.Session
// Established is when the connection was established.
Established time.Time
// Ctx is the full RPC context
Ctx *RPCContext
2018-01-15 22:48:53 +00:00
}
// getNodeConn returns the connection to the given node and whether it exists.
func (s *Server) getNodeConn(nodeID string) (*nodeConnState, bool) {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
2018-04-26 20:22:09 +00:00
conns, ok := s.nodeConns[nodeID]
2018-05-10 22:45:57 +00:00
if !ok {
return nil, false
}
2018-04-26 20:22:09 +00:00
// Return the latest conn
var state *nodeConnState
for _, conn := range conns {
if state == nil || state.Established.Before(conn.Established) {
state = conn
}
}
// Shouldn't happen but rather be safe
if state == nil {
2018-09-15 23:23:13 +00:00
s.logger.Named("client_rpc").Warn("node exists in node connection map without any connection", "node_id", nodeID)
return nil, false
}
2018-01-15 22:48:53 +00:00
return state, ok
}
// connectedNodes returns the set of nodes we have a connection with.
func (s *Server) connectedNodes() map[string]time.Time {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
nodes := make(map[string]time.Time, len(s.nodeConns))
2018-04-26 20:22:09 +00:00
for nodeID, conns := range s.nodeConns {
for _, conn := range conns {
if nodes[nodeID].Before(conn.Established) {
nodes[nodeID] = conn.Established
}
}
2018-01-15 22:48:53 +00:00
}
return nodes
}
// addNodeConn adds the mapping between a node and its session.
func (s *Server) addNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
2018-04-26 20:22:09 +00:00
// Capture the tracked connections so far
currentConns := s.nodeConns[ctx.NodeID]
// Check if we already have the connection. If we do, just update the
// establish time.
for _, c := range currentConns {
if c.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
c.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
c.Established = time.Now()
return
}
}
// Add the new conn
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID], &nodeConnState{
2018-01-15 22:48:53 +00:00
Session: ctx.Session,
Established: time.Now(),
Ctx: ctx,
2018-04-26 20:22:09 +00:00
})
2018-01-15 22:48:53 +00:00
}
// removeNodeConn removes the mapping between a node and its session.
func (s *Server) removeNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
2018-04-26 20:22:09 +00:00
conns, ok := s.nodeConns[ctx.NodeID]
if !ok {
return
}
// It is important that we check that the connection being removed is the
// actual stored connection for the client. It is possible for the client to
// dial various addresses that all route to the same server. The most common
// case for this is the original address the client uses to connect to the
// server differs from the advertised address sent by the heartbeat.
2018-04-26 20:22:09 +00:00
for i, conn := range conns {
if conn.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
conn.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
2018-04-27 17:36:28 +00:00
if len(conns) == 1 {
// We are deleting the last conn, remove it from the map
delete(s.nodeConns, ctx.NodeID)
} else {
// Slice out the connection we are deleting
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID][:i], s.nodeConns[ctx.NodeID][i+1:]...)
}
return
}
}
2018-01-15 22:48:53 +00:00
}
// serverWithNodeConn is used to determine which remote server has the most
// recent connection to the given node. The local server is not queried.
// ErrNoNodeConn is returned if all local peers could be queried but did not
// have a connection to the node. Otherwise if a connection could not be found
// and there were RPC errors, an error is returned.
2018-01-30 06:01:42 +00:00
func (s *Server) serverWithNodeConn(nodeID, region string) (*serverParts, error) {
2018-01-15 22:48:53 +00:00
// We skip ourselves.
selfAddr := s.LocalMember().Addr.String()
// Build the request
req := &structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{
Region: s.config.Region,
},
}
2018-01-30 06:01:42 +00:00
// Select the list of servers to check based on what region we are querying
s.peerLock.RLock()
var rawTargets []*serverParts
if region == s.Region() {
rawTargets = make([]*serverParts, 0, len(s.localPeers))
for _, srv := range s.localPeers {
rawTargets = append(rawTargets, srv)
}
} else {
peers, ok := s.peers[region]
if !ok {
s.peerLock.RUnlock()
return nil, structs.ErrNoRegionPath
}
rawTargets = peers
}
targets := make([]*serverParts, 0, len(rawTargets))
for _, target := range rawTargets {
targets = append(targets, target.Copy())
}
s.peerLock.RUnlock()
2018-01-15 22:48:53 +00:00
// connections is used to store the servers that have connections to the
// requested node.
var mostRecentServer *serverParts
var mostRecent time.Time
var rpcErr multierror.Error
2018-01-30 06:01:42 +00:00
for _, server := range targets {
if server.Addr.String() == selfAddr {
2018-01-15 22:48:53 +00:00
continue
}
// Make the RPC
var resp structs.NodeConnQueryResponse
err := s.connPool.RPC(s.config.Region, server.Addr, "Status.HasNodeConn", &req, &resp)
2018-01-15 22:48:53 +00:00
if err != nil {
multierror.Append(&rpcErr, fmt.Errorf("failed querying server %q: %v", server.Addr.String(), err))
continue
}
if resp.Connected && resp.Established.After(mostRecent) {
mostRecentServer = server
mostRecent = resp.Established
}
}
// Return an error if there is no route to the node.
if mostRecentServer == nil {
if err := rpcErr.ErrorOrNil(); err != nil {
return nil, err
}
2018-01-27 01:05:38 +00:00
return nil, structs.ErrNoNodeConn
2018-01-15 22:48:53 +00:00
}
return mostRecentServer, nil
}
2018-01-27 01:05:38 +00:00
// NodeRpc is used to make an RPC call to a node. The method takes the
// Yamux session for the node and the method to be called.
func NodeRpc(session *yamux.Session, method string, args, reply interface{}) error {
// Open a new session
stream, err := session.Open()
if err != nil {
csi: CLI for volume status, registration/deregistration and plugin status (#7193) * command/csi: csi, csi_plugin, csi_volume * helper/funcs: move ExtraKeys from parse_config to UnusedKeys * command/agent/config_parse: use helper.UnusedKeys * api/csi: annotate CSIVolumes with hcl fields * command/csi_plugin: add Synopsis * command/csi_volume_register: use hcl.Decode style parsing * command/csi_volume_list * command/csi_volume_status: list format, cleanup * command/csi_plugin_list * command/csi_plugin_status * command/csi_volume_deregister * command/csi_volume: add Synopsis * api/contexts/contexts: add csi search contexts to the constants * command/commands: register csi commands * api/csi: fix struct tag for linter * command/csi_plugin_list: unused struct vars * command/csi_plugin_status: unused struct vars * command/csi_volume_list: unused struct vars * api/csi: add allocs to CSIPlugin * command/csi_plugin_status: format the allocs * api/allocations: copy Allocation.Stub in from structs * nomad/client_rpc: add some error context with Errorf * api/csi: collapse read & write alloc maps to a stub list * command/csi_volume_status: cleanup allocation display * command/csi_volume_list: use Schedulable instead of Healthy * command/csi_volume_status: use Schedulable instead of Healthy * command/csi_volume_list: sprintf string * command/csi: delete csi.go, csi_plugin.go * command/plugin: refactor csi components to sub-command plugin status * command/plugin: remove csi * command/plugin_status: remove csi * command/volume: remove csi * command/volume_status: split out csi specific * helper/funcs: add RemoveEqualFold * command/agent/config_parse: use helper.RemoveEqualFold * api/csi: do ,unusedKeys right * command/volume: refactor csi components to `nomad volume` * command/volume_register: split out csi specific * command/commands: use the new top level commands * command/volume_deregister: hardwired type csi for now * command/volume_status: csiFormatVolumes rescued from volume_list * command/plugin_status: avoid a panic on no args * command/volume_status: avoid a panic on no args * command/plugin_status: predictVolumeType * command/volume_status: predictVolumeType * nomad/csi_endpoint_test: move CreateTestPlugin to testing * command/plugin_status_test: use CreateTestCSIPlugin * nomad/structs/structs: add CSIPlugins and CSIVolumes search consts * nomad/state/state_store: add CSIPlugins and CSIVolumesByIDPrefix * nomad/search_endpoint: add CSIPlugins and CSIVolumes * command/plugin_status: move the header to the csi specific * command/volume_status: move the header to the csi specific * nomad/state/state_store: CSIPluginByID prefix * command/status: rename the search context to just Plugins/Volumes * command/plugin,volume_status: test return ids now * command/status: rename the search context to just Plugins/Volumes * command/plugin_status: support -json and -t * command/volume_status: support -json and -t * command/plugin_status_csi: comments * command/*_status: clean up text * api/csi: fix stale comments * command/volume: make deregister sound less fearsome * command/plugin_status: set the id length * command/plugin_status_csi: more compact plugin health * command/volume: better error message, comment
2020-03-06 15:09:10 +00:00
return fmt.Errorf("session open: %v", err)
2018-01-27 01:05:38 +00:00
}
defer stream.Close()
// Write the RpcNomad byte to set the mode
if _, err := stream.Write([]byte{byte(pool.RpcNomad)}); err != nil {
stream.Close()
csi: CLI for volume status, registration/deregistration and plugin status (#7193) * command/csi: csi, csi_plugin, csi_volume * helper/funcs: move ExtraKeys from parse_config to UnusedKeys * command/agent/config_parse: use helper.UnusedKeys * api/csi: annotate CSIVolumes with hcl fields * command/csi_plugin: add Synopsis * command/csi_volume_register: use hcl.Decode style parsing * command/csi_volume_list * command/csi_volume_status: list format, cleanup * command/csi_plugin_list * command/csi_plugin_status * command/csi_volume_deregister * command/csi_volume: add Synopsis * api/contexts/contexts: add csi search contexts to the constants * command/commands: register csi commands * api/csi: fix struct tag for linter * command/csi_plugin_list: unused struct vars * command/csi_plugin_status: unused struct vars * command/csi_volume_list: unused struct vars * api/csi: add allocs to CSIPlugin * command/csi_plugin_status: format the allocs * api/allocations: copy Allocation.Stub in from structs * nomad/client_rpc: add some error context with Errorf * api/csi: collapse read & write alloc maps to a stub list * command/csi_volume_status: cleanup allocation display * command/csi_volume_list: use Schedulable instead of Healthy * command/csi_volume_status: use Schedulable instead of Healthy * command/csi_volume_list: sprintf string * command/csi: delete csi.go, csi_plugin.go * command/plugin: refactor csi components to sub-command plugin status * command/plugin: remove csi * command/plugin_status: remove csi * command/volume: remove csi * command/volume_status: split out csi specific * helper/funcs: add RemoveEqualFold * command/agent/config_parse: use helper.RemoveEqualFold * api/csi: do ,unusedKeys right * command/volume: refactor csi components to `nomad volume` * command/volume_register: split out csi specific * command/commands: use the new top level commands * command/volume_deregister: hardwired type csi for now * command/volume_status: csiFormatVolumes rescued from volume_list * command/plugin_status: avoid a panic on no args * command/volume_status: avoid a panic on no args * command/plugin_status: predictVolumeType * command/volume_status: predictVolumeType * nomad/csi_endpoint_test: move CreateTestPlugin to testing * command/plugin_status_test: use CreateTestCSIPlugin * nomad/structs/structs: add CSIPlugins and CSIVolumes search consts * nomad/state/state_store: add CSIPlugins and CSIVolumesByIDPrefix * nomad/search_endpoint: add CSIPlugins and CSIVolumes * command/plugin_status: move the header to the csi specific * command/volume_status: move the header to the csi specific * nomad/state/state_store: CSIPluginByID prefix * command/status: rename the search context to just Plugins/Volumes * command/plugin,volume_status: test return ids now * command/status: rename the search context to just Plugins/Volumes * command/plugin_status: support -json and -t * command/volume_status: support -json and -t * command/plugin_status_csi: comments * command/*_status: clean up text * api/csi: fix stale comments * command/volume: make deregister sound less fearsome * command/plugin_status: set the id length * command/plugin_status_csi: more compact plugin health * command/volume: better error message, comment
2020-03-06 15:09:10 +00:00
return fmt.Errorf("set mode: %v", err)
2018-01-27 01:05:38 +00:00
}
// Make the RPC
err = msgpackrpc.CallWithCodec(pool.NewClientCodec(stream), method, args, reply)
if err != nil {
return err
2018-01-27 01:05:38 +00:00
}
return nil
}
// NodeStreamingRpc is used to make a streaming RPC call to a node. The method
// takes the Yamux session for the node and the method to be called. It conducts
// the initial handshake and returns a connection to be used or an error. It is
// the callers responsibility to close the connection if there is no error.
func NodeStreamingRpc(session *yamux.Session, method string) (net.Conn, error) {
// Open a new session
stream, err := session.Open()
if err != nil {
return nil, err
}
// Write the RpcNomad byte to set the mode
if _, err := stream.Write([]byte{byte(pool.RpcStreaming)}); err != nil {
stream.Close()
return nil, err
}
// Send the header
encoder := codec.NewEncoder(stream, structs.MsgpackHandle)
decoder := codec.NewDecoder(stream, structs.MsgpackHandle)
2018-01-27 01:05:38 +00:00
header := structs.StreamingRpcHeader{
Method: method,
}
if err := encoder.Encode(header); err != nil {
stream.Close()
return nil, err
}
// Wait for the acknowledgement
var ack structs.StreamingRpcAck
if err := decoder.Decode(&ack); err != nil {
stream.Close()
return nil, err
}
if ack.Error != "" {
stream.Close()
return nil, errors.New(ack.Error)
}
2018-01-27 01:05:38 +00:00
return stream, nil
}
2018-02-08 23:00:22 +00:00
// findNodeConnAndForward is a helper for finding the server with a connection
// to the given node and forwarding the RPC to the correct server. This does not
// work for streaming RPCs.
func findNodeConnAndForward(srv *Server, nodeID, method string, args, reply interface{}) error {
2018-02-08 23:00:22 +00:00
// Determine the Server that has a connection to the node.
srvWithConn, err := srv.serverWithNodeConn(nodeID, srv.Region())
if err != nil {
return err
}
if srvWithConn == nil {
return structs.ErrNoNodeConn
}
return srv.forwardServer(srvWithConn, method, args, reply)
}