diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index 6e1fa3d2d..6744b8a9a 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -32,21 +32,30 @@ func NewAgentEndpoint(c *Client) *Agent { func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { var resp []byte var err error + + // Determine which profile to run + // and generate profile. Blocks for args.Seconds switch args.ReqType { case profile.CPUReq: - resp, err = profile.CPUProfile(args.Seconds) + resp, err = profile.CPUProfile(context.TODO(), args.Seconds) case profile.CmdReq: resp, err = profile.Cmdline() case profile.LookupReq: resp, err = profile.Profile(args.Profile, args.Debug) case profile.TraceReq: - resp, err = profile.Trace(args.Seconds) + resp, err = profile.Trace(context.TODO(), args.Seconds) } if err != nil { - return err + if profile.IsErrProfileNotFound(err) { + return structs.NewErrRPCCoded(404, err.Error()) + } + return structs.NewErrRPCCoded(500, err.Error()) } + + // Copy profile response to reply reply.Payload = resp + reply.AgentID = m.c.NodeID() return nil } diff --git a/client/structs/structs.go b/client/structs/structs.go index 85db4f689..5429aaf38 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -55,7 +55,6 @@ type MonitorRequest struct { } type AgentPprofRequest struct { - // Profile specifies the profile to use ReqType profile.ReqType @@ -79,6 +78,9 @@ type AgentPprofResponse struct { // Error stores any error that may have occurred. Error *RpcError + // ID of the agent that fulfilled the request + AgentID string + Payload []byte } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index f7d272c2d..d817e94ef 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -338,21 +338,23 @@ func (s *HTTPServer) AgentPprofRequest(resp http.ResponseWriter, req *http.Reque path := strings.TrimPrefix(req.URL.Path, "/v1/agent/pprof/") switch { case path == "": - // no index route + // no root index route return nil, CodedError(404, ErrInvalidMethod) case path == "cmdline": - return s.agentPprofReq(profile.CmdReq, "", resp, req) + return s.agentPprofReq(profile.CmdReq, resp, req) case path == "profile": - return s.agentPprofReq(profile.CPUReq, "", resp, req) + return s.agentPprofReq(profile.CPUReq, resp, req) case path == "trace": - return s.agentPprofReq(profile.TraceReq, "", resp, req) + return s.agentPprofReq(profile.TraceReq, resp, req) default: + // Add profile to request + req.URL.Query().Add("profile", path) // generic pprof profile request - return s.agentPprofReq(profile.LookupReq, path, resp, req) + return s.agentPprofReq(profile.LookupReq, resp, req) } } -func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, resp http.ResponseWriter, req *http.Request) (interface{}, error) { var secret string s.parseToken(req, &secret) @@ -363,11 +365,8 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp return nil, structs.ErrPermissionDenied } - nodeID := req.URL.Query().Get("node_id") - serverID := req.URL.Query().Get("server_id") - secondsParam := req.URL.Query().Get("seconds") - // Parse profile duration, default to 1 second + secondsParam := req.URL.Query().Get("seconds") var seconds int if secondsParam == "" { seconds = 1 @@ -382,9 +381,9 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp // Create the request args := &cstructs.AgentPprofRequest{ ReqType: reqType, - Profile: profile, - NodeID: nodeID, - ServerID: serverID, + Profile: req.URL.Query().Get("profile"), + NodeID: req.URL.Query().Get("node_id"), + ServerID: req.URL.Query().Get("server_id"), Seconds: seconds, } @@ -399,9 +398,7 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp var rpcErr error if args.NodeID != "" { // Make the RPC - localClient, remoteClient, localServer := s.rpcHandlerForNode(nodeID) - - // var handler structs.StreamingRpcHandler + localClient, remoteClient, localServer := s.rpcHandlerForNode(args.NodeID) if localClient { rpcErr = s.agent.Client().ClientRPC("Agent.Profile", &args, &reply) } else if remoteClient { @@ -415,7 +412,7 @@ func (s *HTTPServer) agentPprofReq(reqType profile.ReqType, profile string, resp } if rpcErr != nil { - // TODO: rpcErr should return codedErr + // Return RPCCodedErr return nil, rpcErr } diff --git a/command/agent/profile/pprof.go b/command/agent/profile/pprof.go index 27df15c63..d3285b1a7 100644 --- a/command/agent/profile/pprof.go +++ b/command/agent/profile/pprof.go @@ -11,14 +11,6 @@ import ( "time" ) -// Cmdline responds with the running program's -// command line, with arguments separated by NUL bytes. -func Cmdline() ([]byte, error) { - var buf bytes.Buffer - fmt.Fprintf(&buf, strings.Join(os.Args, "\x00")) - return buf.Bytes(), nil -} - // goroutine // threadcreate // heap @@ -32,13 +24,36 @@ const ( CPUReq ReqType = "cpu" TraceReq ReqType = "trace" LookupReq ReqType = "profile" + + ErrProfileNotFoundPrefix = "Pprof profile not found" ) -// Profile +// NewErrProfileNotFound returns a new error caused by a pprof.Lookup +// profile not being found +func NewErrProfileNotFound(profile string) error { + return fmt.Errorf("%s %s", ErrProfileNotFoundPrefix, profile) +} + +// IsErrProfileNotFound returns whether the error is due to a pprof profile +// being invalid +func IsErrProfileNotFound(err error) bool { + return err != nil && strings.Contains(err.Error(), ErrProfileNotFoundPrefix) +} + +// Cmdline responds with the running program's +// command line, with arguments separated by NUL bytes. +func Cmdline() ([]byte, error) { + var buf bytes.Buffer + fmt.Fprintf(&buf, strings.Join(os.Args, "\x00")) + return buf.Bytes(), nil +} + +// Profile generates a pprof.Profile report for the given profile name +// see runtime/pprof/pprof.go for available profiles. func Profile(profile string, debug int) ([]byte, error) { p := pprof.Lookup(profile) if p == nil { - return nil, fmt.Errorf("Unknown profile: %s", profile) + return nil, NewErrProfileNotFound(profile) } var buf bytes.Buffer @@ -49,7 +64,8 @@ func Profile(profile string, debug int) ([]byte, error) { return buf.Bytes(), nil } -func CPUProfile(sec int) ([]byte, error) { +// CPUProfile generates a CPU Profile for a given duration +func CPUProfile(ctx context.Context, sec int) ([]byte, error) { if sec <= 0 { sec = 1 } @@ -60,14 +76,15 @@ func CPUProfile(sec int) ([]byte, error) { return nil, err } - sleep(context.TODO(), time.Duration(sec)*time.Second) + sleep(ctx, time.Duration(sec)*time.Second) pprof.StopCPUProfile() return buf.Bytes(), nil } -func Trace(sec int) ([]byte, error) { +// Trace runs a trace profile for a given duration +func Trace(ctx context.Context, sec int) ([]byte, error) { if sec <= 0 { sec = 1 } @@ -86,6 +103,7 @@ func Trace(sec int) ([]byte, error) { } func sleep(ctx context.Context, d time.Duration) { + // Sleep until duration is met or ctx is cancelled select { case <-time.After(d): case <-ctx.Done(): diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 60d8c7b44..604eacc5b 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -13,6 +13,7 @@ import ( sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/monitor" + "github.com/hashicorp/nomad/command/agent/profile" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" @@ -27,6 +28,63 @@ func (m *Agent) register() { m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor) } +func (m *Agent) Profile(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { + // Targeting a node, forward request to node + if args.NodeID != "" { + return m.forwardProfileClient(args, reply) + } + + currentServer := m.srv.serf.LocalMember().Name + var forwardServer bool + // Targeting a remote server which is not the leader and not this server + if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { + forwardServer = true + } + + // Targeting leader and this server is not current leader + if args.ServerID == "leader" && !m.srv.IsLeader() { + forwardServer = true + } + + if forwardServer { + // forward the request + return m.forwardProfileServer(args, reply) + } + + var resp []byte + var err error + + // Mark which server fulfilled the request + reply.AgentID = m.srv.serf.LocalMember().Name + + // Determine which profile to run + // and generate profile. Blocks for args.Seconds + switch args.ReqType { + case profile.CPUReq: + resp, err = profile.CPUProfile(context.TODO(), args.Seconds) + case profile.CmdReq: + resp, err = profile.Cmdline() + case profile.LookupReq: + resp, err = profile.Profile(args.Profile, args.Debug) + case profile.TraceReq: + resp, err = profile.Trace(context.TODO(), args.Seconds) + default: + err = structs.NewErrRPCCoded(404, "Unknown profile request type") + } + + if err != nil { + if profile.IsErrProfileNotFound(err) { + return structs.NewErrRPCCoded(404, err.Error()) + } + return structs.NewErrRPCCoded(500, err.Error()) + } + + // Copy profile response to reply + reply.Payload = resp + + return nil +} + func (m *Agent) monitor(conn io.ReadWriteCloser) { defer conn.Close() @@ -306,3 +364,96 @@ func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.Moni structs.Bridge(conn, serverConn) return } + +func (m *Agent) forwardProfileServer(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { + var target *serverParts + serverID := args.ServerID + + // empty ServerID to prevent forwarding loop + args.ServerID = "" + + if serverID == "leader" { + isLeader, remoteServer := m.srv.getLeader() + if !isLeader && remoteServer != nil { + target = remoteServer + } + if !isLeader && remoteServer == nil { + return structs.NewErrRPCCoded(400, structs.ErrNoLeader.Error()) + } + } else { + // See if the server ID is a known member + serfMembers := m.srv.Members() + for _, mem := range serfMembers { + if mem.Name == serverID { + if ok, srv := isNomadServer(mem); ok { + target = srv + } + } + } + } + + // Unable to find a server + if target == nil { + err := fmt.Errorf("unknown nomad server %s", serverID) + return structs.NewErrRPCCoded(400, err.Error()) + } + + // Forward the request + rpcErr := m.srv.forwardServer(target, "Agent.Profile", args, reply) + if rpcErr != nil { + return structs.NewErrRPCCoded(500, rpcErr.Error()) + } + + return nil +} + +func (m *Agent) forwardProfileClient(args *cstructs.AgentPprofRequest, reply *cstructs.AgentPprofResponse) error { + nodeID := args.NodeID + + snap, err := m.srv.State().Snapshot() + if err != nil { + return structs.NewErrRPCCoded(500, err.Error()) + } + + node, err := snap.NodeByID(nil, nodeID) + if err != nil { + return structs.NewErrRPCCoded(500, err.Error()) + } + + if node == nil { + err := fmt.Errorf("Unknown node %q", nodeID) + return structs.NewErrRPCCoded(400, err.Error()) + } + + if err := nodeSupportsRpc(node); err != nil { + return structs.NewErrRPCCoded(400, err.Error()) + } + + // Get the Connection to the client either by fowarding to another server + // or creating direct stream + state, ok := m.srv.getNodeConn(nodeID) + if !ok { + // Determine the server that has a connection to the node + srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region()) + if err != nil { + code := 500 + if structs.IsErrNoNodeConn(err) { + code = 404 + } + return structs.NewErrRPCCoded(code, err.Error()) + } + + rpcErr := m.srv.forwardServer(srv, "Agent.Profile", args, reply) + if rpcErr != nil { + return structs.NewErrRPCCoded(500, err.Error()) + } + } else { + // NodeRpc + rpcErr := NodeRpc(state.Session, "Agent.Profile", args, reply) + if rpcErr != nil { + return structs.NewErrRPCCoded(500, err.Error()) + } + } + + return nil +} diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 77a23e558..ef93da44a 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/nomad/client/config" sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/profile" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -456,3 +458,140 @@ func TestMonitor_Monitor_ACL(t *testing.T) { }) } } + +func TestAgentProfile_RemoteClient(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server and client + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + c, cleanup := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.GetConfig().RPCAddr.String()} + }) + defer cleanup() + + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a clients") + }) + + req := cstructs.AgentPprofRequest{ + ReqType: profile.CPUReq, + NodeID: c.NodeID(), + } + + reply := cstructs.AgentPprofResponse{} + + err := s1.RPC("Agent.Profile", &req, &reply) + require.NoError(err) + + require.NotNil(reply.Payload) + require.Equal(c.NodeID(), reply.AgentID) +} + +func TestAgentProfile_Server(t *testing.T) { + t.Parallel() + + // start servers + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // determine leader and nonleader + servers := []*Server{s1, s2} + var nonLeader *Server + var leader *Server + for _, s := range servers { + if !s.IsLeader() { + nonLeader = s + } else { + leader = s + } + } + + cases := []struct { + desc string + serverID string + origin *Server + expectedErr string + expectedAgentID string + reqType profile.ReqType + }{ + { + desc: "remote leader", + serverID: "leader", + origin: nonLeader, + reqType: profile.CmdReq, + expectedAgentID: leader.serf.LocalMember().Name, + }, + { + desc: "remote server", + serverID: nonLeader.serf.LocalMember().Name, + origin: leader, + reqType: profile.CmdReq, + expectedAgentID: nonLeader.serf.LocalMember().Name, + }, + { + desc: "serverID is current leader", + serverID: "leader", + origin: leader, + reqType: profile.CmdReq, + expectedAgentID: leader.serf.LocalMember().Name, + }, + { + desc: "serverID is current server", + serverID: nonLeader.serf.LocalMember().Name, + origin: nonLeader, + reqType: profile.CPUReq, + expectedAgentID: nonLeader.serf.LocalMember().Name, + }, + { + desc: "serverID is unknown", + serverID: uuid.Generate(), + origin: nonLeader, + reqType: profile.CmdReq, + expectedErr: "unknown nomad server", + expectedAgentID: "", + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) + + req := cstructs.AgentPprofRequest{ + ReqType: tc.reqType, + ServerID: tc.serverID, + } + + reply := cstructs.AgentPprofResponse{} + + err := tc.origin.RPC("Agent.Profile", &req, &reply) + if tc.expectedErr != "" { + require.Contains(err.Error(), tc.expectedErr) + } else { + require.Nil(err) + require.NotNil(reply.Payload) + } + + require.Equal(tc.expectedAgentID, reply.AgentID) + }) + } +} diff --git a/nomad/server.go b/nomad/server.go index e6c00c0e7..a902beb41 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1067,6 +1067,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(s.staticEndpoints.ClientStats) server.Register(s.staticEndpoints.ClientAllocations) server.Register(s.staticEndpoints.FileSystem) + server.Register(s.staticEndpoints.Agent) // Create new dynamic endpoints and add them to the RPC server. node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")}