diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index 6ad62bae4..a95a5345d 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -77,7 +77,6 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { var buf bytes.Buffer frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) - // framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024) framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) framer.Run() defer framer.Destroy() diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 6c66c2233..a3da1aa39 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -62,72 +62,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { // Targeting a client so forward the request if args.NodeID != "" { - nodeID := args.NodeID - - snap, err := m.srv.State().Snapshot() - if err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - node, err := snap.NodeByID(nil, nodeID) - if err != nil { - handleStreamResultError(err, helper.Int64ToPtr(500), encoder) - return - } - - if node == nil { - err := fmt.Errorf("Unknown node %q", nodeID) - handleStreamResultError(err, helper.Int64ToPtr(400), encoder) - return - } - - if err := nodeSupportsRpc(node); err != nil { - handleStreamResultError(err, helper.Int64ToPtr(400), encoder) - return - } - - // Get the Connection to the client either by fowarding to another server - // or creating direct stream - var clientConn net.Conn - state, ok := m.srv.getNodeConn(nodeID) - if !ok { - // Determine the server that has a connection to the node - srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region()) - if err != nil { - var code *int64 - if structs.IsErrNoNodeConn(err) { - code = helper.Int64ToPtr(404) - } - handleStreamResultError(err, code, encoder) - return - } - conn, err := m.srv.streamingRpc(srv, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - clientConn = conn - } else { - stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor") - if err != nil { - handleStreamResultError(err, nil, encoder) - return - } - clientConn = stream - } - defer clientConn.Close() - - // Send the Request - outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle) - if err := outEncoder.Encode(args); err != nil { - handleStreamResultError(err, nil, encoder) - return - } - - structs.Bridge(conn, clientConn) - return + m.forwardMonitor(conn, args, encoder, decoder) } // NodeID was empty, so monitor this current server @@ -234,3 +169,72 @@ OUTER: } } } + +func (m *Agent) forwardMonitor(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { + nodeID := args.NodeID + + snap, err := m.srv.State().Snapshot() + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + node, err := snap.NodeByID(nil, nodeID) + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + if node == nil { + err := fmt.Errorf("Unknown node %q", nodeID) + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + if err := nodeSupportsRpc(node); err != nil { + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + // Get the Connection to the client either by fowarding to another server + // or creating direct stream + var clientConn net.Conn + state, ok := m.srv.getNodeConn(nodeID) + if !ok { + // Determine the server that has a connection to the node + srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region()) + if err != nil { + var code *int64 + if structs.IsErrNoNodeConn(err) { + code = helper.Int64ToPtr(404) + } + handleStreamResultError(err, code, encoder) + return + } + conn, err := m.srv.streamingRpc(srv, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + clientConn = conn + } else { + stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor") + if err != nil { + handleStreamResultError(err, nil, encoder) + return + } + clientConn = stream + } + defer clientConn.Close() + + // Send the Request + outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + handleStreamResultError(err, nil, encoder) + return + } + + structs.Bridge(conn, clientConn) + return +}