open-nomad/api/agent.go

441 lines
11 KiB
Go
Raw Normal View History

2015-09-08 18:41:03 +00:00
package api
import (
2017-10-13 23:43:00 +00:00
"encoding/json"
2015-09-08 18:41:03 +00:00
"fmt"
"io/ioutil"
2015-09-08 21:26:53 +00:00
"net/url"
"strconv"
2015-09-08 18:41:03 +00:00
)
// Agent encapsulates an API client which talks to Nomad's
// agent endpoints for a specific node.
type Agent struct {
client *Client
// Cache static agent info
nodeName string
datacenter string
region string
2015-09-08 18:41:03 +00:00
}
// KeyringResponse is a unified key response and can be used for install,
// remove, use, as well as listing key queries.
type KeyringResponse struct {
Messages map[string]string
Keys map[string]int
NumNodes int
}
// KeyringRequest is request objects for serf key operations.
type KeyringRequest struct {
Key string
}
2015-09-08 18:41:03 +00:00
// Agent returns a new agent which can be used to query
// the agent-specific endpoints.
func (c *Client) Agent() *Agent {
return &Agent{client: c}
}
// Self is used to query the /v1/agent/self endpoint and
// returns information specific to the running agent.
func (a *Agent) Self() (*AgentSelf, error) {
var out *AgentSelf
2015-09-08 18:41:03 +00:00
// Query the self endpoint on the agent
_, err := a.client.query("/v1/agent/self", &out, nil)
if err != nil {
return nil, fmt.Errorf("failed querying self endpoint: %s", err)
}
// Populate the cache for faster queries
a.populateCache(out)
2015-09-08 18:41:03 +00:00
return out, nil
}
// populateCache is used to insert various pieces of static
// data into the agent handle. This is used during subsequent
// lookups for the same data later on to save the round trip.
func (a *Agent) populateCache(self *AgentSelf) {
if a.nodeName == "" {
a.nodeName = self.Member.Name
}
if a.datacenter == "" {
2017-03-28 03:54:43 +00:00
if val, ok := self.Config["Datacenter"]; ok {
a.datacenter, _ = val.(string)
}
}
if a.region == "" {
2017-03-28 03:54:43 +00:00
if val, ok := self.Config["Region"]; ok {
a.region, _ = val.(string)
}
}
}
// NodeName is used to query the Nomad agent for its node name.
func (a *Agent) NodeName() (string, error) {
// Return from cache if we have it
if a.nodeName != "" {
return a.nodeName, nil
}
// Query the node name
_, err := a.Self()
return a.nodeName, err
}
// Datacenter is used to return the name of the datacenter which
// the agent is a member of.
func (a *Agent) Datacenter() (string, error) {
// Return from cache if we have it
if a.datacenter != "" {
return a.datacenter, nil
}
// Query the agent for the DC
_, err := a.Self()
return a.datacenter, err
}
// Region is used to look up the region the agent is in.
func (a *Agent) Region() (string, error) {
// Return from cache if we have it
if a.region != "" {
return a.region, nil
}
// Query the agent for the region
_, err := a.Self()
return a.region, err
}
2015-09-08 21:26:53 +00:00
// Join is used to instruct a server node to join another server
// via the gossip protocol. Multiple addresses may be specified.
2015-09-11 22:40:54 +00:00
// We attempt to join all of the hosts in the list. Returns the
// number of nodes successfully joined and any error. If one or
2015-09-08 21:26:53 +00:00
// more nodes have a successful result, no error is returned.
2015-09-11 22:40:54 +00:00
func (a *Agent) Join(addrs ...string) (int, error) {
2015-09-08 21:26:53 +00:00
// Accumulate the addresses
v := url.Values{}
for _, addr := range addrs {
v.Add("address", addr)
}
// Send the join request
var resp joinResponse
_, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil)
if err != nil {
2015-09-11 22:40:54 +00:00
return 0, fmt.Errorf("failed joining: %s", err)
2015-09-08 21:26:53 +00:00
}
if resp.Error != "" {
2015-09-11 22:40:54 +00:00
return 0, fmt.Errorf("failed joining: %s", resp.Error)
2015-09-08 21:26:53 +00:00
}
2015-09-14 21:04:30 +00:00
return resp.NumJoined, nil
2015-09-08 21:26:53 +00:00
}
2015-09-08 21:47:29 +00:00
// Members is used to query all of the known server members
func (a *Agent) Members() (*ServerMembers, error) {
var resp *ServerMembers
2015-09-08 21:47:29 +00:00
// Query the known members
_, err := a.client.query("/v1/agent/members", &resp, nil)
if err != nil {
return nil, err
}
return resp, nil
}
// ForceLeave is used to eject an existing node from the cluster.
func (a *Agent) ForceLeave(node string) error {
_, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil)
return err
}
2015-09-25 04:17:33 +00:00
// Servers is used to query the list of servers on a client node.
func (a *Agent) Servers() ([]string, error) {
var resp []string
_, err := a.client.query("/v1/agent/servers", &resp, nil)
if err != nil {
return nil, err
}
return resp, nil
}
// SetServers is used to update the list of servers on a client node.
func (a *Agent) SetServers(addrs []string) error {
// Accumulate the addresses
v := url.Values{}
for _, addr := range addrs {
v.Add("address", addr)
}
_, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil)
return err
}
// ListKeys returns the list of installed keys
func (a *Agent) ListKeys() (*KeyringResponse, error) {
var resp KeyringResponse
_, err := a.client.query("/v1/agent/keyring/list", &resp, nil)
if err != nil {
return nil, err
}
return &resp, nil
}
// InstallKey installs a key in the keyrings of all the serf members
func (a *Agent) InstallKey(key string) (*KeyringResponse, error) {
args := KeyringRequest{
Key: key,
}
var resp KeyringResponse
_, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil)
return &resp, err
}
// UseKey uses a key from the keyring of serf members
func (a *Agent) UseKey(key string) (*KeyringResponse, error) {
args := KeyringRequest{
Key: key,
}
var resp KeyringResponse
_, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil)
return &resp, err
}
// RemoveKey removes a particular key from keyrings of serf members
func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) {
args := KeyringRequest{
Key: key,
}
var resp KeyringResponse
_, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil)
return &resp, err
}
2017-10-13 23:43:00 +00:00
// Health queries the agent's health
func (a *Agent) Health() (*AgentHealthResponse, error) {
req, err := a.client.newRequest("GET", "/v1/agent/health")
if err != nil {
return nil, err
}
var health AgentHealthResponse
_, resp, err := a.client.doRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Always try to decode the response as JSON
err = json.NewDecoder(resp.Body).Decode(&health)
if err == nil {
return &health, nil
}
// Return custom error when response is not expected JSON format
2018-03-11 19:07:44 +00:00
return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
2017-10-13 23:43:00 +00:00
}
// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
if err != nil {
errCh <- err
return nil, errCh
}
r.setQueryOptions(q)
_, resp, err := requireOK(a.client.doRequest(r))
if err != nil {
errCh <- err
return nil, errCh
}
frames := make(chan *StreamFrame, 10)
go func() {
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
for {
select {
case <-stopCh:
close(frames)
return
default:
}
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
errCh <- err
return
}
// Discard heartbeat frame
if frame.IsHeartbeat() {
continue
}
frames <- &frame
}
}()
2019-11-01 14:33:28 +00:00
return frames, errCh
}
// PprofOptions contain a set of parameters for profiling a node or server.
type PprofOptions struct {
// ServerID is the server ID, name, or special value "leader" to
// specify the server that a given profile should be run on.
ServerID string
// NodeID is the node ID that a given profile should be run on.
NodeID string
// Seconds specifies the amount of time a profile should be run for.
// Seconds only applies for certain runtime profiles like CPU and Trace.
Seconds int
// GC determines if a runtime.GC() should be called before a heap
// profile.
GC int
// Debug specifies if the output of a lookup profile should be returned
// in human readable format instead of binary.
Debug int
}
// CPUProfile returns a runtime/pprof cpu profile for a given server or node.
// The profile will run for the amount of seconds passed in or default to 1.
// If no serverID or nodeID are provided the current Agents server will be
// used.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile.
func (a *Agent) CPUProfile(opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest("profile", opts, q)
}
// Trace returns a runtime/pprof trace for a given server or node.
// The trace will run for the amount of seconds passed in or default to 1.
// If no serverID or nodeID are provided the current Agents server will be
// used.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile.
func (a *Agent) Trace(opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest("trace", opts, q)
}
// Lookup returns a runtime/pprof profile using pprof.Lookup to determine
// which profile to run. Accepts a client or server ID but not both simultaneously.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile unless debug is set.
func (a *Agent) Lookup(profile string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest(profile, opts, q)
}
func (a *Agent) pprofRequest(req string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
q.Params["seconds"] = strconv.Itoa(opts.Seconds)
q.Params["debug"] = strconv.Itoa(opts.Debug)
q.Params["gc"] = strconv.Itoa(opts.GC)
q.Params["node_id"] = opts.NodeID
q.Params["server_id"] = opts.ServerID
body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", req), q)
if err != nil {
return nil, err
}
resp, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
return resp, nil
}
2015-09-08 21:26:53 +00:00
// joinResponse is used to decode the response we get while
// sending a member join request.
type joinResponse struct {
2015-09-14 21:04:30 +00:00
NumJoined int `json:"num_joined"`
Error string `json:"error"`
2015-09-08 21:26:53 +00:00
}
2015-09-08 21:47:29 +00:00
type ServerMembers struct {
ServerName string
ServerRegion string
ServerDC string
Members []*AgentMember
}
type AgentSelf struct {
Config map[string]interface{} `json:"config"`
Member AgentMember `json:"member"`
Stats map[string]map[string]string `json:"stats"`
}
2015-09-08 21:47:29 +00:00
// AgentMember represents a cluster member known to the agent
type AgentMember struct {
Name string
Addr string
Port uint16
Tags map[string]string
2015-09-11 22:19:43 +00:00
Status string
2015-09-08 21:47:29 +00:00
ProtocolMin uint8
ProtocolMax uint8
ProtocolCur uint8
DelegateMin uint8
DelegateMax uint8
DelegateCur uint8
}
// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort
// based on the Name, DC and Region
type AgentMembersNameSort []*AgentMember
func (a AgentMembersNameSort) Len() int { return len(a) }
func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a AgentMembersNameSort) Less(i, j int) bool {
if a[i].Tags["region"] != a[j].Tags["region"] {
return a[i].Tags["region"] < a[j].Tags["region"]
}
if a[i].Tags["dc"] != a[j].Tags["dc"] {
return a[i].Tags["dc"] < a[j].Tags["dc"]
}
return a[i].Name < a[j].Name
}
2017-10-13 23:43:00 +00:00
2018-03-11 17:54:04 +00:00
// AgentHealthResponse is the response from the Health endpoint describing an
2017-10-13 23:43:00 +00:00
// agent's health.
type AgentHealthResponse struct {
Client *AgentHealth `json:"client,omitempty"`
Server *AgentHealth `json:"server,omitempty"`
}
// AgentHealth describes the Client or Server's health in a Health request.
type AgentHealth struct {
// Ok is false if the agent is unhealthy
Ok bool `json:"ok"`
// Message describes why the agent is unhealthy
Message string `json:"message"`
}