Server request forwarding for Agent.Profile

Return rpc errors for profile requests, set up remote forwarding to
target leader or server id for profile requests.

server forwarding, endpoint tests
This commit is contained in:
Drew Bailey 2019-12-09 10:55:43 -05:00
parent 901f362858
commit 50288461c9
No known key found for this signature in database
GPG Key ID: FBA61B9FB7CCE1A7
7 changed files with 351 additions and 34 deletions

View File

@ -32,21 +32,30 @@ func NewAgentEndpoint(c *Client) *Agent {
func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
var resp []byte
var err error
// Determine which profile to run
// and generate profile. Blocks for args.Seconds
switch args.ReqType {
case profile.CPUReq:
resp, err = profile.CPUProfile(args.Seconds)
resp, err = profile.CPUProfile(context.TODO(), args.Seconds)
case profile.CmdReq:
resp, err = profile.Cmdline()
case profile.LookupReq:
resp, err = profile.Profile(args.Profile, args.Debug)
case profile.TraceReq:
resp, err = profile.Trace(args.Seconds)
resp, err = profile.Trace(context.TODO(), args.Seconds)
}
if err != nil {
return err
if profile.IsErrProfileNotFound(err) {
return structs.NewErrRPCCoded(404, err.Error())
}
return structs.NewErrRPCCoded(500, err.Error())
}
// Copy profile response to reply
reply.Payload = resp
reply.AgentID = m.c.NodeID()
return nil
}

View File

@ -55,7 +55,6 @@ type MonitorRequest struct {
}
type AgentPprofRequest struct {
// Profile specifies the profile to use
ReqType profile.ReqType
@ -79,6 +78,9 @@ type AgentPprofResponse struct {
// Error stores any error that may have occurred.
Error *RpcError
// ID of the agent that fulfilled the request
AgentID string
Payload []byte
}

View File

@ -338,21 +338,23 @@ func (s *HTTPServer) AgentPprofRequest(resp http.ResponseWriter, req *http.Reque
path := strings.TrimPrefix(req.URL.Path, "/v1/agent/pprof/")
switch {
case path == "":
// no index route
// no root index route
return nil, CodedError(404, ErrInvalidMethod)
case path == "cmdline":
return s.agentPprofReq(profile.CmdReq, "", resp, req)
return s.agentPprofReq(profile.CmdReq, resp, req)
case path == "profile":
return s.agentPprofReq(profile.CPUReq, "", resp, req)
return s.agentPprofReq(profile.CPUReq, resp, req)
case path == "trace":
return s.agentPprofReq(profile.TraceReq, "", resp, req)
return s.agentPprofReq(profile.TraceReq, resp, req)
default:
// Add profile to request
req.URL.Query().Add("profile", path)
// generic pprof profile request
return s.agentPprofReq(profile.LookupReq, path, resp, req)
return s.agentPprofReq(profile.LookupReq, resp, req)
}
}
func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
@ -363,11 +365,8 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp
return nil, structs.ErrPermissionDenied
}
nodeID := req.URL.Query().Get("node_id")
serverID := req.URL.Query().Get("server_id")
secondsParam := req.URL.Query().Get("seconds")
// Parse profile duration, default to 1 second
secondsParam := req.URL.Query().Get("seconds")
var seconds int
if secondsParam == "" {
seconds = 1
@ -382,9 +381,9 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp
// Create the request
args := &cstructs.AgentPprofRequest{
ReqType: reqType,
Profile: profile,
NodeID: nodeID,
ServerID: serverID,
Profile: req.URL.Query().Get("profile"),
NodeID: req.URL.Query().Get("node_id"),
ServerID: req.URL.Query().Get("server_id"),
Seconds: seconds,
}
@ -399,9 +398,7 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp
var rpcErr error
if args.NodeID != "" {
// Make the RPC
localClient, remoteClient, localServer := s.rpcHandlerForNode(nodeID)
// var handler structs.StreamingRpcHandler
localClient, remoteClient, localServer := s.rpcHandlerForNode(args.NodeID)
if localClient {
rpcErr = s.agent.Client().ClientRPC("Agent.Profile", &args, &reply)
} else if remoteClient {
@ -415,7 +412,7 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp
}
if rpcErr != nil {
// TODO: rpcErr should return codedErr
// Return RPCCodedErr
return nil, rpcErr
}

View File

@ -11,14 +11,6 @@ import (
"time"
)
// Cmdline responds with the running program's
// command line, with arguments separated by NUL bytes.
func Cmdline() ([]byte, error) {
var buf bytes.Buffer
fmt.Fprintf(&buf, strings.Join(os.Args, "\x00"))
return buf.Bytes(), nil
}
// goroutine
// threadcreate
// heap
@ -32,13 +24,36 @@ const (
CPUReq ReqType = "cpu"
TraceReq ReqType = "trace"
LookupReq ReqType = "profile"
ErrProfileNotFoundPrefix = "Pprof profile not found"
)
// Profile
// NewErrProfileNotFound returns a new error caused by a pprof.Lookup
// profile not being found
func NewErrProfileNotFound(profile string) error {
return fmt.Errorf("%s %s", ErrProfileNotFoundPrefix, profile)
}
// IsErrProfileNotFound returns whether the error is due to a pprof profile
// being invalid
func IsErrProfileNotFound(err error) bool {
return err != nil && strings.Contains(err.Error(), ErrProfileNotFoundPrefix)
}
// Cmdline responds with the running program's
// command line, with arguments separated by NUL bytes.
func Cmdline() ([]byte, error) {
var buf bytes.Buffer
fmt.Fprintf(&buf, strings.Join(os.Args, "\x00"))
return buf.Bytes(), nil
}
// Profile generates a pprof.Profile report for the given profile name
// see runtime/pprof/pprof.go for available profiles.
func Profile(profile string, debug int) ([]byte, error) {
p := pprof.Lookup(profile)
if p == nil {
return nil, fmt.Errorf("Unknown profile: %s", profile)
return nil, NewErrProfileNotFound(profile)
}
var buf bytes.Buffer
@ -49,7 +64,8 @@ func Profile(profile string, debug int) ([]byte, error) {
return buf.Bytes(), nil
}
func CPUProfile(sec int) ([]byte, error) {
// CPUProfile generates a CPU Profile for a given duration
func CPUProfile(ctx context.Context, sec int) ([]byte, error) {
if sec <= 0 {
sec = 1
}
@ -60,14 +76,15 @@ func CPUProfile(sec int) ([]byte, error) {
return nil, err
}
sleep(context.TODO(), time.Duration(sec)*time.Second)
sleep(ctx, time.Duration(sec)*time.Second)
pprof.StopCPUProfile()
return buf.Bytes(), nil
}
func Trace(sec int) ([]byte, error) {
// Trace runs a trace profile for a given duration
func Trace(ctx context.Context, sec int) ([]byte, error) {
if sec <= 0 {
sec = 1
}
@ -86,6 +103,7 @@ func Trace(sec int) ([]byte, error) {
}
func sleep(ctx context.Context, d time.Duration) {
// Sleep until duration is met or ctx is cancelled
select {
case <-time.After(d):
case <-ctx.Done():

View File

@ -13,6 +13,7 @@ import (
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/profile"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
@ -27,6 +28,63 @@ func (m *Agent) register() {
m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor)
}
func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
// Targeting a node, forward request to node
if args.NodeID != "" {
return m.forwardProfileClient(args, reply)
}
currentServer := m.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
forwardServer = true
}
// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !m.srv.IsLeader() {
forwardServer = true
}
if forwardServer {
// forward the request
return m.forwardProfileServer(args, reply)
}
var resp []byte
var err error
// Mark which server fulfilled the request
reply.AgentID = m.srv.serf.LocalMember().Name
// Determine which profile to run
// and generate profile. Blocks for args.Seconds
switch args.ReqType {
case profile.CPUReq:
resp, err = profile.CPUProfile(context.TODO(), args.Seconds)
case profile.CmdReq:
resp, err = profile.Cmdline()
case profile.LookupReq:
resp, err = profile.Profile(args.Profile, args.Debug)
case profile.TraceReq:
resp, err = profile.Trace(context.TODO(), args.Seconds)
default:
err = structs.NewErrRPCCoded(404, "Unknown profile request type")
}
if err != nil {
if profile.IsErrProfileNotFound(err) {
return structs.NewErrRPCCoded(404, err.Error())
}
return structs.NewErrRPCCoded(500, err.Error())
}
// Copy profile response to reply
reply.Payload = resp
return nil
}
func (m *Agent) monitor(conn io.ReadWriteCloser) {
defer conn.Close()
@ -306,3 +364,96 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni
structs.Bridge(conn, serverConn)
return
}
func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
var target *serverParts
serverID := args.ServerID
// empty ServerID to prevent forwarding loop
args.ServerID = ""
if serverID == "leader" {
isLeader, remoteServer := m.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
if !isLeader && remoteServer == nil {
return structs.NewErrRPCCoded(400, structs.ErrNoLeader.Error())
}
} else {
// See if the server ID is a known member
serfMembers := m.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
target = srv
}
}
}
}
// Unable to find a server
if target == nil {
err := fmt.Errorf("unknown nomad server %s", serverID)
return structs.NewErrRPCCoded(400, err.Error())
}
// Forward the request
rpcErr := m.srv.forwardServer(target, "Agent.Profile", args, reply)
if rpcErr != nil {
return structs.NewErrRPCCoded(500, rpcErr.Error())
}
return nil
}
func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
nodeID := args.NodeID
snap, err := m.srv.State().Snapshot()
if err != nil {
return structs.NewErrRPCCoded(500, err.Error())
}
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
return structs.NewErrRPCCoded(500, err.Error())
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
return structs.NewErrRPCCoded(400, err.Error())
}
if err := nodeSupportsRpc(node); err != nil {
return structs.NewErrRPCCoded(400, err.Error())
}
// Get the Connection to the client either by fowarding to another server
// or creating direct stream
state, ok := m.srv.getNodeConn(nodeID)
if !ok {
// Determine the server that has a connection to the node
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
if err != nil {
code := 500
if structs.IsErrNoNodeConn(err) {
code = 404
}
return structs.NewErrRPCCoded(code, err.Error())
}
rpcErr := m.srv.forwardServer(srv, "Agent.Profile", args, reply)
if rpcErr != nil {
return structs.NewErrRPCCoded(500, err.Error())
}
} else {
// NodeRpc
rpcErr := NodeRpc(state.Session, "Agent.Profile", args, reply)
if rpcErr != nil {
return structs.NewErrRPCCoded(500, err.Error())
}
}
return nil
}

View File

@ -15,6 +15,8 @@ import (
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/profile"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@ -456,3 +458,140 @@ func TestMonitor_Monitor_ACL(t *testing.T) {
})
}
}
func TestAgentProfile_RemoteClient(t *testing.T) {
t.Parallel()
require := require.New(t)
// start server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.GetConfig().RPCAddr.String()}
})
defer cleanup()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
req := cstructs.AgentPprofRequest{
ReqType: profile.CPUReq,
NodeID: c.NodeID(),
}
reply := cstructs.AgentPprofResponse{}
err := s1.RPC("Agent.Profile", &req, &reply)
require.NoError(err)
require.NotNil(reply.Payload)
require.Equal(c.NodeID(), reply.AgentID)
}
func TestAgentProfile_Server(t *testing.T) {
t.Parallel()
// start servers
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// determine leader and nonleader
servers := []*Server{s1, s2}
var nonLeader *Server
var leader *Server
for _, s := range servers {
if !s.IsLeader() {
nonLeader = s
} else {
leader = s
}
}
cases := []struct {
desc string
serverID string
origin *Server
expectedErr string
expectedAgentID string
reqType profile.ReqType
}{
{
desc: "remote leader",
serverID: "leader",
origin: nonLeader,
reqType: profile.CmdReq,
expectedAgentID: leader.serf.LocalMember().Name,
},
{
desc: "remote server",
serverID: nonLeader.serf.LocalMember().Name,
origin: leader,
reqType: profile.CmdReq,
expectedAgentID: nonLeader.serf.LocalMember().Name,
},
{
desc: "serverID is current leader",
serverID: "leader",
origin: leader,
reqType: profile.CmdReq,
expectedAgentID: leader.serf.LocalMember().Name,
},
{
desc: "serverID is current server",
serverID: nonLeader.serf.LocalMember().Name,
origin: nonLeader,
reqType: profile.CPUReq,
expectedAgentID: nonLeader.serf.LocalMember().Name,
},
{
desc: "serverID is unknown",
serverID: uuid.Generate(),
origin: nonLeader,
reqType: profile.CmdReq,
expectedErr: "unknown nomad server",
expectedAgentID: "",
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
req := cstructs.AgentPprofRequest{
ReqType: tc.reqType,
ServerID: tc.serverID,
}
reply := cstructs.AgentPprofResponse{}
err := tc.origin.RPC("Agent.Profile", &req, &reply)
if tc.expectedErr != "" {
require.Contains(err.Error(), tc.expectedErr)
} else {
require.Nil(err)
require.NotNil(reply.Payload)
}
require.Equal(tc.expectedAgentID, reply.AgentID)
})
}
}

View File

@ -1067,6 +1067,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
server.Register(s.staticEndpoints.ClientStats)
server.Register(s.staticEndpoints.ClientAllocations)
server.Register(s.staticEndpoints.FileSystem)
server.Register(s.staticEndpoints.Agent)
// Create new dynamic endpoints and add them to the RPC server.
node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")}