45210ed901
Address pr feedback, rename profile package to pprof to more accurately describe its purpose. Adds gc param for heap lookup profiles.
472 lines
12 KiB
Go
472 lines
12 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/url"
|
|
"strconv"
|
|
)
|
|
|
|
// Agent encapsulates an API client which talks to Nomad's
|
|
// agent endpoints for a specific node.
|
|
type Agent struct {
|
|
client *Client
|
|
|
|
// Cache static agent info
|
|
nodeName string
|
|
datacenter string
|
|
region string
|
|
}
|
|
|
|
// KeyringResponse is a unified key response and can be used for install,
|
|
// remove, use, as well as listing key queries.
|
|
type KeyringResponse struct {
|
|
Messages map[string]string
|
|
Keys map[string]int
|
|
NumNodes int
|
|
}
|
|
|
|
// KeyringRequest is request objects for serf key operations.
|
|
type KeyringRequest struct {
|
|
Key string
|
|
}
|
|
|
|
// Agent returns a new agent which can be used to query
|
|
// the agent-specific endpoints.
|
|
func (c *Client) Agent() *Agent {
|
|
return &Agent{client: c}
|
|
}
|
|
|
|
// Self is used to query the /v1/agent/self endpoint and
|
|
// returns information specific to the running agent.
|
|
func (a *Agent) Self() (*AgentSelf, error) {
|
|
var out *AgentSelf
|
|
|
|
// Query the self endpoint on the agent
|
|
_, err := a.client.query("/v1/agent/self", &out, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed querying self endpoint: %s", err)
|
|
}
|
|
|
|
// Populate the cache for faster queries
|
|
a.populateCache(out)
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// populateCache is used to insert various pieces of static
|
|
// data into the agent handle. This is used during subsequent
|
|
// lookups for the same data later on to save the round trip.
|
|
func (a *Agent) populateCache(self *AgentSelf) {
|
|
if a.nodeName == "" {
|
|
a.nodeName = self.Member.Name
|
|
}
|
|
if a.datacenter == "" {
|
|
if val, ok := self.Config["Datacenter"]; ok {
|
|
a.datacenter, _ = val.(string)
|
|
}
|
|
}
|
|
if a.region == "" {
|
|
if val, ok := self.Config["Region"]; ok {
|
|
a.region, _ = val.(string)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NodeName is used to query the Nomad agent for its node name.
|
|
func (a *Agent) NodeName() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.nodeName != "" {
|
|
return a.nodeName, nil
|
|
}
|
|
|
|
// Query the node name
|
|
_, err := a.Self()
|
|
return a.nodeName, err
|
|
}
|
|
|
|
// Datacenter is used to return the name of the datacenter which
|
|
// the agent is a member of.
|
|
func (a *Agent) Datacenter() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.datacenter != "" {
|
|
return a.datacenter, nil
|
|
}
|
|
|
|
// Query the agent for the DC
|
|
_, err := a.Self()
|
|
return a.datacenter, err
|
|
}
|
|
|
|
// Region is used to look up the region the agent is in.
|
|
func (a *Agent) Region() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.region != "" {
|
|
return a.region, nil
|
|
}
|
|
|
|
// Query the agent for the region
|
|
_, err := a.Self()
|
|
return a.region, err
|
|
}
|
|
|
|
// Join is used to instruct a server node to join another server
|
|
// via the gossip protocol. Multiple addresses may be specified.
|
|
// We attempt to join all of the hosts in the list. Returns the
|
|
// number of nodes successfully joined and any error. If one or
|
|
// more nodes have a successful result, no error is returned.
|
|
func (a *Agent) Join(addrs ...string) (int, error) {
|
|
// Accumulate the addresses
|
|
v := url.Values{}
|
|
for _, addr := range addrs {
|
|
v.Add("address", addr)
|
|
}
|
|
|
|
// Send the join request
|
|
var resp joinResponse
|
|
_, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed joining: %s", err)
|
|
}
|
|
if resp.Error != "" {
|
|
return 0, fmt.Errorf("failed joining: %s", resp.Error)
|
|
}
|
|
return resp.NumJoined, nil
|
|
}
|
|
|
|
// Members is used to query all of the known server members
|
|
func (a *Agent) Members() (*ServerMembers, error) {
|
|
var resp *ServerMembers
|
|
|
|
// Query the known members
|
|
_, err := a.client.query("/v1/agent/members", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// ForceLeave is used to eject an existing node from the cluster.
|
|
func (a *Agent) ForceLeave(node string) error {
|
|
_, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil)
|
|
return err
|
|
}
|
|
|
|
// Servers is used to query the list of servers on a client node.
|
|
func (a *Agent) Servers() ([]string, error) {
|
|
var resp []string
|
|
_, err := a.client.query("/v1/agent/servers", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// SetServers is used to update the list of servers on a client node.
|
|
func (a *Agent) SetServers(addrs []string) error {
|
|
// Accumulate the addresses
|
|
v := url.Values{}
|
|
for _, addr := range addrs {
|
|
v.Add("address", addr)
|
|
}
|
|
|
|
_, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil)
|
|
return err
|
|
}
|
|
|
|
// ListKeys returns the list of installed keys
|
|
func (a *Agent) ListKeys() (*KeyringResponse, error) {
|
|
var resp KeyringResponse
|
|
_, err := a.client.query("/v1/agent/keyring/list", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &resp, nil
|
|
}
|
|
|
|
// InstallKey installs a key in the keyrings of all the serf members
|
|
func (a *Agent) InstallKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// UseKey uses a key from the keyring of serf members
|
|
func (a *Agent) UseKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// RemoveKey removes a particular key from keyrings of serf members
|
|
func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// Health queries the agent's health
|
|
func (a *Agent) Health() (*AgentHealthResponse, error) {
|
|
req, err := a.client.newRequest("GET", "/v1/agent/health")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var health AgentHealthResponse
|
|
_, resp, err := a.client.doRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Always try to decode the response as JSON
|
|
err = json.NewDecoder(resp.Body).Decode(&health)
|
|
if err == nil {
|
|
return &health, nil
|
|
}
|
|
|
|
// Return custom error when response is not expected JSON format
|
|
return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
|
|
}
|
|
|
|
// Monitor returns a channel which will receive streaming logs from the agent
|
|
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
|
|
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
|
|
errCh := make(chan error, 1)
|
|
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
|
|
if err != nil {
|
|
errCh <- err
|
|
return nil, errCh
|
|
}
|
|
|
|
r.setQueryOptions(q)
|
|
_, resp, err := requireOK(a.client.doRequest(r))
|
|
if err != nil {
|
|
errCh <- err
|
|
return nil, errCh
|
|
}
|
|
|
|
frames := make(chan *StreamFrame, 10)
|
|
go func() {
|
|
defer resp.Body.Close()
|
|
|
|
dec := json.NewDecoder(resp.Body)
|
|
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
close(frames)
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Decode the next frame
|
|
var frame StreamFrame
|
|
if err := dec.Decode(&frame); err != nil {
|
|
close(frames)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
// Discard heartbeat frame
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
frames <- &frame
|
|
}
|
|
}()
|
|
|
|
return frames, errCh
|
|
}
|
|
|
|
// CPUProfile returns a runtime/pprof cpu profile for a given server or node.
|
|
// The profile will run for the amount of seconds passed in or default to 1.
|
|
// If no serverID or nodeID are provided the current Agents server will be
|
|
// used.
|
|
//
|
|
// The parameters are:
|
|
// * serverID: server ID or name to query, also accepts "leader"
|
|
// * nodeID: client node ID to query
|
|
// * seconds: the amount of time to run the trace for.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile.
|
|
func (a *Agent) CPUProfile(serverID, nodeID string, seconds int, q *QueryOptions) ([]byte, error) {
|
|
if q == nil {
|
|
q = &QueryOptions{}
|
|
}
|
|
if q.Params == nil {
|
|
q.Params = make(map[string]string)
|
|
}
|
|
|
|
q.Params["seconds"] = strconv.Itoa(seconds)
|
|
q.Params["node_id"] = nodeID
|
|
q.Params["server_id"] = serverID
|
|
|
|
body, err := a.client.rawQuery("/v1/agent/pprof/profile", q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := ioutil.ReadAll(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// Trace returns a runtime/pprof trace for a given server or node.
|
|
// The trace will run for the amount of seconds passed in or default to 1.
|
|
// If no serverID or nodeID are provided the current Agents server will be
|
|
// used.
|
|
//
|
|
// The parameters are:
|
|
// * serverID: server ID or name to query, also accepts "leader"
|
|
// * nodeID: client node ID to query
|
|
// * seconds: the amount of time to run the trace for.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile.
|
|
func (a *Agent) Trace(serverID, nodeID string, seconds int, q *QueryOptions) ([]byte, error) {
|
|
if q == nil {
|
|
q = &QueryOptions{}
|
|
}
|
|
if q.Params == nil {
|
|
q.Params = make(map[string]string)
|
|
}
|
|
|
|
q.Params["seconds"] = strconv.Itoa(seconds)
|
|
q.Params["node_id"] = nodeID
|
|
q.Params["server_id"] = serverID
|
|
|
|
body, err := a.client.rawQuery("/v1/agent/pprof/trace", q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := ioutil.ReadAll(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
// Profile returns a runtime/pprof profile using pprof.Lookup to determine
|
|
// which profile to run. Accepts a client or server ID but not both simultaneously.
|
|
//
|
|
// The parameters are:
|
|
// * serverID: server ID or name to query, also accepts "leader"
|
|
// * nodeID: client node ID to query
|
|
// * profile: the name of the runtime/pprof profile to lookup and run.
|
|
// * debug: flag to specify if the profile should return human readable output.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile.
|
|
func (a *Agent) Profile(serverID, nodeID, profile string, debug int, q *QueryOptions) ([]byte, error) {
|
|
if q == nil {
|
|
q = &QueryOptions{}
|
|
}
|
|
if q.Params == nil {
|
|
q.Params = make(map[string]string)
|
|
}
|
|
|
|
q.Params["debug"] = strconv.Itoa(debug)
|
|
q.Params["node_id"] = nodeID
|
|
q.Params["server_id"] = serverID
|
|
|
|
body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", profile), q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := ioutil.ReadAll(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// joinResponse is used to decode the response we get while
|
|
// sending a member join request.
|
|
type joinResponse struct {
|
|
NumJoined int `json:"num_joined"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type ServerMembers struct {
|
|
ServerName string
|
|
ServerRegion string
|
|
ServerDC string
|
|
Members []*AgentMember
|
|
}
|
|
|
|
type AgentSelf struct {
|
|
Config map[string]interface{} `json:"config"`
|
|
Member AgentMember `json:"member"`
|
|
Stats map[string]map[string]string `json:"stats"`
|
|
}
|
|
|
|
// AgentMember represents a cluster member known to the agent
|
|
type AgentMember struct {
|
|
Name string
|
|
Addr string
|
|
Port uint16
|
|
Tags map[string]string
|
|
Status string
|
|
ProtocolMin uint8
|
|
ProtocolMax uint8
|
|
ProtocolCur uint8
|
|
DelegateMin uint8
|
|
DelegateMax uint8
|
|
DelegateCur uint8
|
|
}
|
|
|
|
// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort
|
|
// based on the Name, DC and Region
|
|
type AgentMembersNameSort []*AgentMember
|
|
|
|
func (a AgentMembersNameSort) Len() int { return len(a) }
|
|
func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a AgentMembersNameSort) Less(i, j int) bool {
|
|
if a[i].Tags["region"] != a[j].Tags["region"] {
|
|
return a[i].Tags["region"] < a[j].Tags["region"]
|
|
}
|
|
|
|
if a[i].Tags["dc"] != a[j].Tags["dc"] {
|
|
return a[i].Tags["dc"] < a[j].Tags["dc"]
|
|
}
|
|
|
|
return a[i].Name < a[j].Name
|
|
|
|
}
|
|
|
|
// AgentHealthResponse is the response from the Health endpoint describing an
|
|
// agent's health.
|
|
type AgentHealthResponse struct {
|
|
Client *AgentHealth `json:"client,omitempty"`
|
|
Server *AgentHealth `json:"server,omitempty"`
|
|
}
|
|
|
|
// AgentHealth describes the Client or Server's health in a Health request.
|
|
type AgentHealth struct {
|
|
// Ok is false if the agent is unhealthy
|
|
Ok bool `json:"ok"`
|
|
|
|
// Message describes why the agent is unhealthy
|
|
Message string `json:"message"`
|
|
}
|