move forwarded monitor request into helper
This commit is contained in:
parent
8726b685de
commit
298b8358a9
|
@ -77,7 +77,6 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
|
|||
var buf bytes.Buffer
|
||||
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
|
||||
|
||||
// framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024)
|
||||
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
|
||||
framer.Run()
|
||||
defer framer.Destroy()
|
||||
|
|
|
@ -62,72 +62,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
|
|||
|
||||
// Targeting a client so forward the request
|
||||
if args.NodeID != "" {
|
||||
nodeID := args.NodeID
|
||||
|
||||
snap, err := m.srv.State().Snapshot()
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
node, err := snap.NodeByID(nil, nodeID)
|
||||
if err != nil {
|
||||
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
if node == nil {
|
||||
err := fmt.Errorf("Unknown node %q", nodeID)
|
||||
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
if err := nodeSupportsRpc(node); err != nil {
|
||||
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
// Get the Connection to the client either by fowarding to another server
|
||||
// or creating direct stream
|
||||
var clientConn net.Conn
|
||||
state, ok := m.srv.getNodeConn(nodeID)
|
||||
if !ok {
|
||||
// Determine the server that has a connection to the node
|
||||
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
|
||||
if err != nil {
|
||||
var code *int64
|
||||
if structs.IsErrNoNodeConn(err) {
|
||||
code = helper.Int64ToPtr(404)
|
||||
}
|
||||
handleStreamResultError(err, code, encoder)
|
||||
return
|
||||
}
|
||||
conn, err := m.srv.streamingRpc(srv, "Agent.Monitor")
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
clientConn = conn
|
||||
} else {
|
||||
stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor")
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
clientConn = stream
|
||||
}
|
||||
defer clientConn.Close()
|
||||
|
||||
// Send the Request
|
||||
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
|
||||
if err := outEncoder.Encode(args); err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
structs.Bridge(conn, clientConn)
|
||||
return
|
||||
m.forwardMonitor(conn, args, encoder, decoder)
|
||||
}
|
||||
|
||||
// NodeID was empty, so monitor this current server
|
||||
|
@ -234,3 +169,72 @@ OUTER:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
|
||||
nodeID := args.NodeID
|
||||
|
||||
snap, err := m.srv.State().Snapshot()
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
node, err := snap.NodeByID(nil, nodeID)
|
||||
if err != nil {
|
||||
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
if node == nil {
|
||||
err := fmt.Errorf("Unknown node %q", nodeID)
|
||||
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
if err := nodeSupportsRpc(node); err != nil {
|
||||
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
// Get the Connection to the client either by fowarding to another server
|
||||
// or creating direct stream
|
||||
var clientConn net.Conn
|
||||
state, ok := m.srv.getNodeConn(nodeID)
|
||||
if !ok {
|
||||
// Determine the server that has a connection to the node
|
||||
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
|
||||
if err != nil {
|
||||
var code *int64
|
||||
if structs.IsErrNoNodeConn(err) {
|
||||
code = helper.Int64ToPtr(404)
|
||||
}
|
||||
handleStreamResultError(err, code, encoder)
|
||||
return
|
||||
}
|
||||
conn, err := m.srv.streamingRpc(srv, "Agent.Monitor")
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
clientConn = conn
|
||||
} else {
|
||||
stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor")
|
||||
if err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
clientConn = stream
|
||||
}
|
||||
defer clientConn.Close()
|
||||
|
||||
// Send the Request
|
||||
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
|
||||
if err := outEncoder.Encode(args); err != nil {
|
||||
handleStreamResultError(err, nil, encoder)
|
||||
return
|
||||
}
|
||||
|
||||
structs.Bridge(conn, clientConn)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue