agent pprof endpoints

wip, agent endpoint and client endpoint for pprof profiles

agent endpoint test
This commit is contained in:
Drew Bailey 2019-12-04 08:36:12 -05:00
parent b5bcfb533b
commit 49ad5fbc85
No known key found for this signature in database
GPG Key ID: FBA61B9FB7CCE1A7
9 changed files with 337 additions and 1 deletions

View File

@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
)
@ -288,6 +289,27 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Stream
return frames, errCh
}
func (a *Agent) Pprof() ([]byte, error) {
r, err := a.client.newRequest("GET", "/debug/pprof/profile")
if err != nil {
return nil, err
}
_, resp, err := a.client.doRequest(r)
if err != nil {
return nil, fmt.Errorf("error making request: %s", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error decoding body: %s", err)
}
return body, nil
}
// joinResponse is used to decode the response we get while
// sending a member join request.
type joinResponse struct {

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/profile"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
@ -28,6 +29,28 @@ func NewAgentEndpoint(c *Client) *Agent {
return m
}
func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error {
var resp []byte
var err error
switch args.ReqType {
case profile.CPUReq:
resp, err = profile.CPUProfile(args.Seconds)
case profile.CmdReq:
resp, err = profile.Cmdline()
case profile.LookupReq:
resp, err = profile.Profile(args.Profile, args.Debug)
case profile.TraceReq:
resp, err = profile.Trace(args.Seconds)
}
if err != nil {
return err
}
reply.Payload = resp
return nil
}
func (m *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()
@ -76,6 +99,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/profile"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@ -213,3 +214,28 @@ func TestMonitor_Monitor_ACL(t *testing.T) {
})
}
}
func TestAgent_Pprof(t *testing.T) {
t.Parallel()
// start server and client
s := nomad.TestServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
c, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{s.GetConfig().RPCAddr.String()}
})
defer cleanup()
req := cstructs.AgentPprofRequest{
Profile: "allocs",
ReqType: profile.LookupReq,
}
reply := cstructs.AgentPprofResponse{}
err := c.ClientRPC("Agent.Profile", req, &reply)
require.NoError(t, err)
}

View File

@ -236,6 +236,7 @@ func (c *Client) setupClientRpcServer(server *rpc.Server) {
server.Register(c.endpoints.ClientStats)
server.Register(c.endpoints.FileSystem)
server.Register(c.endpoints.Allocations)
server.Register(c.endpoints.Agent)
}
// rpcConnListener is a long lived function that listens for new connections

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/command/agent/profile"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
)
@ -53,6 +54,34 @@ type MonitorRequest struct {
structs.QueryOptions
}
type AgentPprofRequest struct {
// Profile specifies the profile to use
ReqType profile.ReqType
Profile string
// Seconds is the number of seconds to capture a profile
Seconds int
// Debug specifies if pprof profile should inclue debug output
Debug int
// NodeID is the node we want to track the logs of
NodeID string
// ServerID is the server we want to track the logs of
ServerID string
structs.QueryOptions
}
type AgentPprofResponse struct {
// Error stores any error that may have occurred.
Error *RpcError
Payload []byte
}
// AllocFileInfo holds information about a file inside the AllocDir
type AllocFileInfo struct {
Name string

View File

@ -16,6 +16,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/profile"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure"
@ -228,7 +229,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
handlerErr = CodedError(400, "No local Node and node_id not provided")
}
} else {
// No node id we want to monitor this server
// No node id monitor server
handler, handlerErr = s.agent.Server().StreamingRpcHandler("Agent.Monitor")
}
@ -333,6 +334,96 @@ func (s *HTTPServer) AgentForceLeaveRequest(resp http.ResponseWriter, req *http.
return nil, err
}
func (s *HTTPServer) AgentPprofRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
path := strings.TrimPrefix(req.URL.Path, "/v1/agent/pprof/")
switch {
case path == "":
// no index route
return nil, CodedError(404, ErrInvalidMethod)
case path == "cmdline":
return s.agentPprofReq(profile.CmdReq, "", resp, req)
case path == "profile":
return s.agentPprofReq(profile.CPUReq, "", resp, req)
case path == "trace":
return s.agentPprofReq(profile.TraceReq, "", resp, req)
default:
// generic pprof profile request
return s.agentPprofReq(profile.LookupReq, path, resp, req)
}
}
func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
// Check agent read permissions
if aclObj, err := s.agent.Server().ResolveToken(secret); err != nil {
return nil, err
} else if aclObj != nil && !aclObj.AllowAgentWrite() {
return nil, structs.ErrPermissionDenied
}
nodeID := req.URL.Query().Get("node_id")
serverID := req.URL.Query().Get("server_id")
secondsParam := req.URL.Query().Get("seconds")
// Parse profile duration, default to 1 second
var seconds int
if secondsParam == "" {
seconds = 1
} else {
sec, err := strconv.Atoi(secondsParam)
if err != nil {
return nil, CodedError(400, err.Error())
}
seconds = sec
}
// Create the request
args := &cstructs.AgentPprofRequest{
ReqType: reqType,
Profile: profile,
NodeID: nodeID,
ServerID: serverID,
Seconds: seconds,
}
// if node and server were requested return error
if args.NodeID != "" && args.ServerID != "" {
return nil, CodedError(400, "Cannot target node and server simultaneously")
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
var reply cstructs.AgentPprofResponse
var rpcErr error
if args.NodeID != "" {
// Make the RPC
localClient, remoteClient, localServer := s.rpcHandlerForNode(nodeID)
// var handler structs.StreamingRpcHandler
if localClient {
rpcErr = s.agent.Client().ClientRPC("Agent.Profile", &args, &reply)
} else if remoteClient {
rpcErr = s.agent.Client().RPC("Agent.Profile", &args, &reply)
} else if localServer {
rpcErr = s.agent.Server().RPC("Agent.Profile", &args, &reply)
}
} else {
// No node id request server
rpcErr = s.agent.Server().RPC("Agent.Profile", &args, &reply)
}
if rpcErr != nil {
return nil, rpcErr
}
// resp.Write(reply.Payload)
return reply.Payload, nil
// return string(reply.Payload), rpcErr
}
// AgentServersRequest is used to query the list of servers used by the Nomad
// Client for RPCs. This endpoint can also be used to update the list of
// servers for a given agent.

View File

@ -388,6 +388,54 @@ func TestHTTP_AgentMonitor(t *testing.T) {
})
}
func TestAgent_PprofRequest(t *testing.T) {
cases := []struct {
desc string
url string
addNodeID bool
addServerID bool
expectedErr error
expectedStatus int
}{
{
desc: "cmdline request",
url: "/v1/agent/pprof/cmdline",
addNodeID: true,
expectedStatus: 200,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
// add node or server id query param
url := tc.url
if tc.addNodeID {
url = url + "?node_id=" + s.client.NodeID()
} else if tc.addServerID {
url = url + "?server_id=" + s.server.LocalMember().Name
}
req, err := http.NewRequest("GET", url, nil)
require.Nil(t, err)
respW := httptest.NewRecorder()
resp, err := s.Server.AgentPprofRequest(respW, req)
if tc.expectedErr != nil {
require.Error(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, resp)
}
require.Equal(t, tc.expectedStatus, respW.Code)
})
})
}
}
type closableRecorder struct {
*httptest.ResponseRecorder
closer chan bool

View File

@ -186,6 +186,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest))
s.mux.HandleFunc("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.mux.HandleFunc("/v1/agent/pprof/", s.wrap(s.AgentPprofRequest))
s.mux.HandleFunc("/v1/metrics", s.wrap(s.MetricsRequest))
s.mux.HandleFunc("/v1/validate/job", s.wrap(s.ValidateJobRequest))

View File

@ -0,0 +1,93 @@
package profile
import (
"bytes"
"context"
"fmt"
"os"
"runtime/pprof"
"runtime/trace"
"strings"
"time"
)
// Cmdline responds with the running program's
// command line, with arguments separated by NUL bytes.
func Cmdline() ([]byte, error) {
var buf bytes.Buffer
fmt.Fprintf(&buf, strings.Join(os.Args, "\x00"))
return buf.Bytes(), nil
}
// goroutine
// threadcreate
// heap
// allocs
// block
// mutex
type ReqType string
const (
CmdReq ReqType = "cmdline"
CPUReq ReqType = "cpu"
TraceReq ReqType = "trace"
LookupReq ReqType = "profile"
)
// Profile
func Profile(profile string, debug int) ([]byte, error) {
p := pprof.Lookup(profile)
if p == nil {
return nil, fmt.Errorf("Unknown profile: %s", profile)
}
var buf bytes.Buffer
if err := p.WriteTo(&buf, debug); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func CPUProfile(sec int) ([]byte, error) {
if sec <= 0 {
sec = 1
}
var buf bytes.Buffer
if err := pprof.StartCPUProfile(&buf); err != nil {
// trace.Start failed, no writes yet
return nil, err
}
sleep(context.TODO(), time.Duration(sec)*time.Second)
pprof.StopCPUProfile()
return buf.Bytes(), nil
}
func Trace(sec int) ([]byte, error) {
if sec <= 0 {
sec = 1
}
var buf bytes.Buffer
if err := trace.Start(&buf); err != nil {
// trace.Start failed, no writes yet
return nil, err
}
sleep(context.TODO(), time.Duration(sec)*time.Second)
trace.Stop()
return buf.Bytes(), nil
}
func sleep(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
}