f8a133a810
When a cluster doesn't have a leader, the `nomad operator debug` command can safely use stale queries to gracefully degrade the consistency of almost all its queries. The query parameter for these API calls was not being set by the command. Some `api` package queries do not include `QueryOptions` because they target a specific agent, but they can potentially be forwarded to other agents. If there is no leader, these forwarded queries will fail. Provide methods to call these APIs with `QueryOptions`.
497 lines
13 KiB
Go
497 lines
13 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/url"
|
|
"strconv"
|
|
)
|
|
|
|
// Agent encapsulates an API client which talks to Nomad's
|
|
// agent endpoints for a specific node.
|
|
type Agent struct {
|
|
client *Client
|
|
|
|
// Cache static agent info
|
|
nodeName string
|
|
datacenter string
|
|
region string
|
|
}
|
|
|
|
// KeyringResponse is a unified key response and can be used for install,
|
|
// remove, use, as well as listing key queries.
|
|
type KeyringResponse struct {
|
|
Messages map[string]string
|
|
Keys map[string]int
|
|
NumNodes int
|
|
}
|
|
|
|
// KeyringRequest is request objects for serf key operations.
|
|
type KeyringRequest struct {
|
|
Key string
|
|
}
|
|
|
|
// Agent returns a new agent which can be used to query
|
|
// the agent-specific endpoints.
|
|
func (c *Client) Agent() *Agent {
|
|
return &Agent{client: c}
|
|
}
|
|
|
|
// Self is used to query the /v1/agent/self endpoint and
|
|
// returns information specific to the running agent.
|
|
func (a *Agent) Self() (*AgentSelf, error) {
|
|
var out *AgentSelf
|
|
|
|
// Query the self endpoint on the agent
|
|
_, err := a.client.query("/v1/agent/self", &out, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed querying self endpoint: %s", err)
|
|
}
|
|
|
|
// Populate the cache for faster queries
|
|
a.populateCache(out)
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// populateCache is used to insert various pieces of static
|
|
// data into the agent handle. This is used during subsequent
|
|
// lookups for the same data later on to save the round trip.
|
|
func (a *Agent) populateCache(self *AgentSelf) {
|
|
if a.nodeName == "" {
|
|
a.nodeName = self.Member.Name
|
|
}
|
|
if a.datacenter == "" {
|
|
if val, ok := self.Config["Datacenter"]; ok {
|
|
a.datacenter, _ = val.(string)
|
|
}
|
|
}
|
|
if a.region == "" {
|
|
if val, ok := self.Config["Region"]; ok {
|
|
a.region, _ = val.(string)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NodeName is used to query the Nomad agent for its node name.
|
|
func (a *Agent) NodeName() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.nodeName != "" {
|
|
return a.nodeName, nil
|
|
}
|
|
|
|
// Query the node name
|
|
_, err := a.Self()
|
|
return a.nodeName, err
|
|
}
|
|
|
|
// Datacenter is used to return the name of the datacenter which
|
|
// the agent is a member of.
|
|
func (a *Agent) Datacenter() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.datacenter != "" {
|
|
return a.datacenter, nil
|
|
}
|
|
|
|
// Query the agent for the DC
|
|
_, err := a.Self()
|
|
return a.datacenter, err
|
|
}
|
|
|
|
// Region is used to look up the region the agent is in.
|
|
func (a *Agent) Region() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.region != "" {
|
|
return a.region, nil
|
|
}
|
|
|
|
// Query the agent for the region
|
|
_, err := a.Self()
|
|
return a.region, err
|
|
}
|
|
|
|
// Join is used to instruct a server node to join another server
|
|
// via the gossip protocol. Multiple addresses may be specified.
|
|
// We attempt to join all of the hosts in the list. Returns the
|
|
// number of nodes successfully joined and any error. If one or
|
|
// more nodes have a successful result, no error is returned.
|
|
func (a *Agent) Join(addrs ...string) (int, error) {
|
|
// Accumulate the addresses
|
|
v := url.Values{}
|
|
for _, addr := range addrs {
|
|
v.Add("address", addr)
|
|
}
|
|
|
|
// Send the join request
|
|
var resp joinResponse
|
|
_, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed joining: %s", err)
|
|
}
|
|
if resp.Error != "" {
|
|
return 0, fmt.Errorf("failed joining: %s", resp.Error)
|
|
}
|
|
return resp.NumJoined, nil
|
|
}
|
|
|
|
// Members is used to query all of the known server members
|
|
func (a *Agent) Members() (*ServerMembers, error) {
|
|
var resp *ServerMembers
|
|
|
|
// Query the known members
|
|
_, err := a.client.query("/v1/agent/members", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// Members is used to query all of the known server members
|
|
// with the ability to set QueryOptions
|
|
func (a *Agent) MembersOpts(opts *QueryOptions) (*ServerMembers, error) {
|
|
var resp *ServerMembers
|
|
_, err := a.client.query("/v1/agent/members", &resp, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// ForceLeave is used to eject an existing node from the cluster.
|
|
func (a *Agent) ForceLeave(node string) error {
|
|
_, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil)
|
|
return err
|
|
}
|
|
|
|
// Servers is used to query the list of servers on a client node.
|
|
func (a *Agent) Servers() ([]string, error) {
|
|
var resp []string
|
|
_, err := a.client.query("/v1/agent/servers", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// SetServers is used to update the list of servers on a client node.
|
|
func (a *Agent) SetServers(addrs []string) error {
|
|
// Accumulate the addresses
|
|
v := url.Values{}
|
|
for _, addr := range addrs {
|
|
v.Add("address", addr)
|
|
}
|
|
|
|
_, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil)
|
|
return err
|
|
}
|
|
|
|
// ListKeys returns the list of installed keys
|
|
func (a *Agent) ListKeys() (*KeyringResponse, error) {
|
|
var resp KeyringResponse
|
|
_, err := a.client.query("/v1/agent/keyring/list", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &resp, nil
|
|
}
|
|
|
|
// InstallKey installs a key in the keyrings of all the serf members
|
|
func (a *Agent) InstallKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// UseKey uses a key from the keyring of serf members
|
|
func (a *Agent) UseKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// RemoveKey removes a particular key from keyrings of serf members
|
|
func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// Health queries the agent's health
|
|
func (a *Agent) Health() (*AgentHealthResponse, error) {
|
|
req, err := a.client.newRequest("GET", "/v1/agent/health")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var health AgentHealthResponse
|
|
_, resp, err := a.client.doRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Always try to decode the response as JSON
|
|
err = json.NewDecoder(resp.Body).Decode(&health)
|
|
if err == nil {
|
|
return &health, nil
|
|
}
|
|
|
|
// Return custom error when response is not expected JSON format
|
|
return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
|
|
}
|
|
|
|
// Host returns debugging context about the agent's host operating system
|
|
func (a *Agent) Host(serverID, nodeID string, q *QueryOptions) (*HostDataResponse, error) {
|
|
if q == nil {
|
|
q = &QueryOptions{}
|
|
}
|
|
if q.Params == nil {
|
|
q.Params = make(map[string]string)
|
|
}
|
|
|
|
if serverID != "" {
|
|
q.Params["server_id"] = serverID
|
|
}
|
|
|
|
if nodeID != "" {
|
|
q.Params["node_id"] = nodeID
|
|
}
|
|
|
|
var resp HostDataResponse
|
|
_, err := a.client.query("/v1/agent/host", &resp, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &resp, nil
|
|
}
|
|
|
|
// Monitor returns a channel which will receive streaming logs from the agent
|
|
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
|
|
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
|
|
errCh := make(chan error, 1)
|
|
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
|
|
if err != nil {
|
|
errCh <- err
|
|
return nil, errCh
|
|
}
|
|
|
|
r.setQueryOptions(q)
|
|
_, resp, err := requireOK(a.client.doRequest(r))
|
|
if err != nil {
|
|
errCh <- err
|
|
return nil, errCh
|
|
}
|
|
|
|
frames := make(chan *StreamFrame, 10)
|
|
go func() {
|
|
defer resp.Body.Close()
|
|
|
|
dec := json.NewDecoder(resp.Body)
|
|
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
close(frames)
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Decode the next frame
|
|
var frame StreamFrame
|
|
if err := dec.Decode(&frame); err != nil {
|
|
close(frames)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
// Discard heartbeat frame
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
frames <- &frame
|
|
}
|
|
}()
|
|
|
|
return frames, errCh
|
|
}
|
|
|
|
// PprofOptions contain a set of parameters for profiling a node or server.
|
|
type PprofOptions struct {
|
|
// ServerID is the server ID, name, or special value "leader" to
|
|
// specify the server that a given profile should be run on.
|
|
ServerID string
|
|
|
|
// NodeID is the node ID that a given profile should be run on.
|
|
NodeID string
|
|
|
|
// Seconds specifies the amount of time a profile should be run for.
|
|
// Seconds only applies for certain runtime profiles like CPU and Trace.
|
|
Seconds int
|
|
|
|
// GC determines if a runtime.GC() should be called before a heap
|
|
// profile.
|
|
GC int
|
|
|
|
// Debug specifies if the output of a lookup profile should be returned
|
|
// in human readable format instead of binary.
|
|
Debug int
|
|
}
|
|
|
|
// CPUProfile returns a runtime/pprof cpu profile for a given server or node.
|
|
// The profile will run for the amount of seconds passed in or default to 1.
|
|
// If no serverID or nodeID are provided the current Agents server will be
|
|
// used.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile.
|
|
func (a *Agent) CPUProfile(opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
return a.pprofRequest("profile", opts, q)
|
|
}
|
|
|
|
// Trace returns a runtime/pprof trace for a given server or node.
|
|
// The trace will run for the amount of seconds passed in or default to 1.
|
|
// If no serverID or nodeID are provided the current Agents server will be
|
|
// used.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile.
|
|
func (a *Agent) Trace(opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
return a.pprofRequest("trace", opts, q)
|
|
}
|
|
|
|
// Lookup returns a runtime/pprof profile using pprof.Lookup to determine
|
|
// which profile to run. Accepts a client or server ID but not both simultaneously.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile unless debug is set.
|
|
func (a *Agent) Lookup(profile string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
return a.pprofRequest(profile, opts, q)
|
|
}
|
|
|
|
func (a *Agent) pprofRequest(req string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
if q == nil {
|
|
q = &QueryOptions{}
|
|
}
|
|
if q.Params == nil {
|
|
q.Params = make(map[string]string)
|
|
}
|
|
|
|
q.Params["seconds"] = strconv.Itoa(opts.Seconds)
|
|
q.Params["debug"] = strconv.Itoa(opts.Debug)
|
|
q.Params["gc"] = strconv.Itoa(opts.GC)
|
|
q.Params["node_id"] = opts.NodeID
|
|
q.Params["server_id"] = opts.ServerID
|
|
|
|
body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", req), q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := ioutil.ReadAll(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// joinResponse is used to decode the response we get while
|
|
// sending a member join request.
|
|
type joinResponse struct {
|
|
NumJoined int `json:"num_joined"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type ServerMembers struct {
|
|
ServerName string
|
|
ServerRegion string
|
|
ServerDC string
|
|
Members []*AgentMember
|
|
}
|
|
|
|
type AgentSelf struct {
|
|
Config map[string]interface{} `json:"config"`
|
|
Member AgentMember `json:"member"`
|
|
Stats map[string]map[string]string `json:"stats"`
|
|
}
|
|
|
|
// AgentMember represents a cluster member known to the agent
|
|
type AgentMember struct {
|
|
Name string
|
|
Addr string
|
|
Port uint16
|
|
Tags map[string]string
|
|
Status string
|
|
ProtocolMin uint8
|
|
ProtocolMax uint8
|
|
ProtocolCur uint8
|
|
DelegateMin uint8
|
|
DelegateMax uint8
|
|
DelegateCur uint8
|
|
}
|
|
|
|
// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort
|
|
// based on the Name, DC and Region
|
|
type AgentMembersNameSort []*AgentMember
|
|
|
|
func (a AgentMembersNameSort) Len() int { return len(a) }
|
|
func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a AgentMembersNameSort) Less(i, j int) bool {
|
|
if a[i].Tags["region"] != a[j].Tags["region"] {
|
|
return a[i].Tags["region"] < a[j].Tags["region"]
|
|
}
|
|
|
|
if a[i].Tags["dc"] != a[j].Tags["dc"] {
|
|
return a[i].Tags["dc"] < a[j].Tags["dc"]
|
|
}
|
|
|
|
return a[i].Name < a[j].Name
|
|
|
|
}
|
|
|
|
// AgentHealthResponse is the response from the Health endpoint describing an
|
|
// agent's health.
|
|
type AgentHealthResponse struct {
|
|
Client *AgentHealth `json:"client,omitempty"`
|
|
Server *AgentHealth `json:"server,omitempty"`
|
|
}
|
|
|
|
// AgentHealth describes the Client or Server's health in a Health request.
|
|
type AgentHealth struct {
|
|
// Ok is false if the agent is unhealthy
|
|
Ok bool `json:"ok"`
|
|
|
|
// Message describes why the agent is unhealthy
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
type HostData struct {
|
|
OS string
|
|
Network []map[string]string
|
|
ResolvConf string
|
|
Hosts string
|
|
Environment map[string]string
|
|
Disk map[string]DiskUsage
|
|
}
|
|
|
|
type DiskUsage struct {
|
|
DiskMB int64
|
|
UsedMB int64
|
|
}
|
|
|
|
type HostDataResponse struct {
|
|
AgentID string
|
|
HostData *HostData `json:",omitempty"`
|
|
}
|