575 lines
16 KiB
Go
575 lines
16 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strconv"
|
|
)
|
|
|
|
// Agent encapsulates an API client which talks to Nomad's
|
|
// agent endpoints for a specific node.
|
|
type Agent struct {
|
|
client *Client
|
|
|
|
// Cache static agent info
|
|
nodeName string
|
|
datacenter string
|
|
region string
|
|
}
|
|
|
|
// KeyringResponse is a unified key response and can be used for install,
|
|
// remove, use, as well as listing key queries.
|
|
type KeyringResponse struct {
|
|
Messages map[string]string
|
|
Keys map[string]int
|
|
NumNodes int
|
|
}
|
|
|
|
// KeyringRequest is request objects for serf key operations.
|
|
type KeyringRequest struct {
|
|
Key string
|
|
}
|
|
|
|
// Agent returns a new agent which can be used to query
|
|
// the agent-specific endpoints.
|
|
func (c *Client) Agent() *Agent {
|
|
return &Agent{client: c}
|
|
}
|
|
|
|
// Self is used to query the /v1/agent/self endpoint and
|
|
// returns information specific to the running agent.
|
|
func (a *Agent) Self() (*AgentSelf, error) {
|
|
var out *AgentSelf
|
|
|
|
// Query the self endpoint on the agent
|
|
_, err := a.client.query("/v1/agent/self", &out, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed querying self endpoint: %s", err)
|
|
}
|
|
|
|
// Populate the cache for faster queries
|
|
a.populateCache(out)
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// populateCache is used to insert various pieces of static
|
|
// data into the agent handle. This is used during subsequent
|
|
// lookups for the same data later on to save the round trip.
|
|
func (a *Agent) populateCache(self *AgentSelf) {
|
|
if a.nodeName == "" {
|
|
a.nodeName = self.Member.Name
|
|
}
|
|
if a.datacenter == "" {
|
|
if val, ok := self.Config["Datacenter"]; ok {
|
|
a.datacenter, _ = val.(string)
|
|
}
|
|
}
|
|
if a.region == "" {
|
|
if val, ok := self.Config["Region"]; ok {
|
|
a.region, _ = val.(string)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NodeName is used to query the Nomad agent for its node name.
|
|
func (a *Agent) NodeName() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.nodeName != "" {
|
|
return a.nodeName, nil
|
|
}
|
|
|
|
// Query the node name
|
|
_, err := a.Self()
|
|
return a.nodeName, err
|
|
}
|
|
|
|
// Datacenter is used to return the name of the datacenter which
|
|
// the agent is a member of.
|
|
func (a *Agent) Datacenter() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.datacenter != "" {
|
|
return a.datacenter, nil
|
|
}
|
|
|
|
// Query the agent for the DC
|
|
_, err := a.Self()
|
|
return a.datacenter, err
|
|
}
|
|
|
|
// Region is used to look up the region the agent is in.
|
|
func (a *Agent) Region() (string, error) {
|
|
// Return from cache if we have it
|
|
if a.region != "" {
|
|
return a.region, nil
|
|
}
|
|
|
|
// Query the agent for the region
|
|
_, err := a.Self()
|
|
return a.region, err
|
|
}
|
|
|
|
// Join is used to instruct a server node to join another server
|
|
// via the gossip protocol. Multiple addresses may be specified.
|
|
// We attempt to join all the hosts in the list. Returns the
|
|
// number of nodes successfully joined and any error. If one or
|
|
// more nodes have a successful result, no error is returned.
|
|
func (a *Agent) Join(addrs ...string) (int, error) {
|
|
// Accumulate the addresses
|
|
v := url.Values{}
|
|
for _, addr := range addrs {
|
|
v.Add("address", addr)
|
|
}
|
|
|
|
// Send the join request
|
|
var resp joinResponse
|
|
_, err := a.client.put("/v1/agent/join?"+v.Encode(), nil, &resp, nil)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed joining: %s", err)
|
|
}
|
|
if resp.Error != "" {
|
|
return 0, fmt.Errorf("failed joining: %s", resp.Error)
|
|
}
|
|
return resp.NumJoined, nil
|
|
}
|
|
|
|
// Members is used to query all of the known server members
|
|
func (a *Agent) Members() (*ServerMembers, error) {
|
|
var resp *ServerMembers
|
|
|
|
// Query the known members
|
|
_, err := a.client.query("/v1/agent/members", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// Members is used to query all of the known server members
|
|
// with the ability to set QueryOptions
|
|
func (a *Agent) MembersOpts(opts *QueryOptions) (*ServerMembers, error) {
|
|
var resp *ServerMembers
|
|
_, err := a.client.query("/v1/agent/members", &resp, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// ForceLeave is used to eject an existing node from the cluster.
|
|
func (a *Agent) ForceLeave(node string) error {
|
|
_, err := a.client.put("/v1/agent/force-leave?node="+node, nil, nil, nil)
|
|
return err
|
|
}
|
|
|
|
// Servers is used to query the list of servers on a client node.
|
|
func (a *Agent) Servers() ([]string, error) {
|
|
var resp []string
|
|
_, err := a.client.query("/v1/agent/servers", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// SetServers is used to update the list of servers on a client node.
|
|
func (a *Agent) SetServers(addrs []string) error {
|
|
// Accumulate the addresses
|
|
v := url.Values{}
|
|
for _, addr := range addrs {
|
|
v.Add("address", addr)
|
|
}
|
|
|
|
_, err := a.client.put("/v1/agent/servers?"+v.Encode(), nil, nil, nil)
|
|
return err
|
|
}
|
|
|
|
// ListKeys returns the list of installed keys
|
|
func (a *Agent) ListKeys() (*KeyringResponse, error) {
|
|
var resp KeyringResponse
|
|
_, err := a.client.query("/v1/agent/keyring/list", &resp, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &resp, nil
|
|
}
|
|
|
|
// InstallKey installs a key in the keyrings of all the serf members
|
|
func (a *Agent) InstallKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.put("/v1/agent/keyring/install", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// UseKey uses a key from the keyring of serf members
|
|
func (a *Agent) UseKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.put("/v1/agent/keyring/use", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// RemoveKey removes a particular key from keyrings of serf members
|
|
func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) {
|
|
args := KeyringRequest{
|
|
Key: key,
|
|
}
|
|
var resp KeyringResponse
|
|
_, err := a.client.put("/v1/agent/keyring/remove", &args, &resp, nil)
|
|
return &resp, err
|
|
}
|
|
|
|
// Health queries the agent's health
|
|
func (a *Agent) Health() (*AgentHealthResponse, error) {
|
|
req, err := a.client.newRequest("GET", "/v1/agent/health")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var health AgentHealthResponse
|
|
_, resp, err := a.client.doRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Always try to decode the response as JSON
|
|
err = json.NewDecoder(resp.Body).Decode(&health)
|
|
if err == nil {
|
|
return &health, nil
|
|
}
|
|
|
|
// Return custom error when response is not expected JSON format
|
|
return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
|
|
}
|
|
|
|
// Host returns debugging context about the agent's host operating system
|
|
func (a *Agent) Host(serverID, nodeID string, q *QueryOptions) (*HostDataResponse, error) {
|
|
if q == nil {
|
|
q = &QueryOptions{}
|
|
}
|
|
if q.Params == nil {
|
|
q.Params = make(map[string]string)
|
|
}
|
|
|
|
if serverID != "" {
|
|
q.Params["server_id"] = serverID
|
|
}
|
|
|
|
if nodeID != "" {
|
|
q.Params["node_id"] = nodeID
|
|
}
|
|
|
|
var resp HostDataResponse
|
|
_, err := a.client.query("/v1/agent/host", &resp, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &resp, nil
|
|
}
|
|
|
|
// Monitor returns a channel which will receive streaming logs from the agent
|
|
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
|
|
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
|
|
errCh := make(chan error, 1)
|
|
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
|
|
if err != nil {
|
|
errCh <- err
|
|
return nil, errCh
|
|
}
|
|
|
|
r.setQueryOptions(q)
|
|
_, resp, err := requireOK(a.client.doRequest(r))
|
|
if err != nil {
|
|
errCh <- err
|
|
return nil, errCh
|
|
}
|
|
|
|
frames := make(chan *StreamFrame, 10)
|
|
go func() {
|
|
defer resp.Body.Close()
|
|
|
|
dec := json.NewDecoder(resp.Body)
|
|
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
close(frames)
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Decode the next frame
|
|
var frame StreamFrame
|
|
if err := dec.Decode(&frame); err != nil {
|
|
close(frames)
|
|
errCh <- err
|
|
return
|
|
}
|
|
|
|
// Discard heartbeat frame
|
|
if frame.IsHeartbeat() {
|
|
continue
|
|
}
|
|
|
|
frames <- &frame
|
|
}
|
|
}()
|
|
|
|
return frames, errCh
|
|
}
|
|
|
|
// PprofOptions contain a set of parameters for profiling a node or server.
|
|
type PprofOptions struct {
|
|
// ServerID is the server ID, name, or special value "leader" to
|
|
// specify the server that a given profile should be run on.
|
|
ServerID string
|
|
|
|
// NodeID is the node ID that a given profile should be run on.
|
|
NodeID string
|
|
|
|
// Seconds specifies the amount of time a profile should be run for.
|
|
// Seconds only applies for certain runtime profiles like CPU and Trace.
|
|
Seconds int
|
|
|
|
// GC determines if a runtime.GC() should be called before a heap
|
|
// profile.
|
|
GC int
|
|
|
|
// Debug specifies if the output of a lookup profile should be returned
|
|
// in human readable format instead of binary.
|
|
Debug int
|
|
}
|
|
|
|
// CPUProfile returns a runtime/pprof cpu profile for a given server or node.
|
|
// The profile will run for the amount of seconds passed in or default to 1.
|
|
// If no serverID or nodeID are provided the current Agents server will be
|
|
// used.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile.
|
|
func (a *Agent) CPUProfile(opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
return a.pprofRequest("profile", opts, q)
|
|
}
|
|
|
|
// Trace returns a runtime/pprof trace for a given server or node.
|
|
// The trace will run for the amount of seconds passed in or default to 1.
|
|
// If no serverID or nodeID are provided the current Agents server will be
|
|
// used.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile.
|
|
func (a *Agent) Trace(opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
return a.pprofRequest("trace", opts, q)
|
|
}
|
|
|
|
// Lookup returns a runtime/pprof profile using pprof.Lookup to determine
|
|
// which profile to run. Accepts a client or server ID but not both simultaneously.
|
|
//
|
|
// The call blocks until the profile finishes, and returns the raw bytes of the
|
|
// profile unless debug is set.
|
|
func (a *Agent) Lookup(profile string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
return a.pprofRequest(profile, opts, q)
|
|
}
|
|
|
|
func (a *Agent) pprofRequest(req string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
|
|
if q == nil {
|
|
q = &QueryOptions{}
|
|
}
|
|
if q.Params == nil {
|
|
q.Params = make(map[string]string)
|
|
}
|
|
|
|
q.Params["seconds"] = strconv.Itoa(opts.Seconds)
|
|
q.Params["debug"] = strconv.Itoa(opts.Debug)
|
|
q.Params["gc"] = strconv.Itoa(opts.GC)
|
|
q.Params["node_id"] = opts.NodeID
|
|
q.Params["server_id"] = opts.ServerID
|
|
|
|
body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", req), q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := io.ReadAll(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// joinResponse is used to decode the response we get while
|
|
// sending a member join request.
|
|
type joinResponse struct {
|
|
NumJoined int `json:"num_joined"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type ServerMembers struct {
|
|
ServerName string
|
|
ServerRegion string
|
|
ServerDC string
|
|
Members []*AgentMember
|
|
}
|
|
|
|
type AgentSelf struct {
|
|
Config map[string]interface{} `json:"config"`
|
|
Member AgentMember `json:"member"`
|
|
Stats map[string]map[string]string `json:"stats"`
|
|
}
|
|
|
|
// AgentMember represents a cluster member known to the agent
|
|
type AgentMember struct {
|
|
Name string
|
|
Addr string
|
|
Port uint16
|
|
Tags map[string]string
|
|
Status string
|
|
ProtocolMin uint8
|
|
ProtocolMax uint8
|
|
ProtocolCur uint8
|
|
DelegateMin uint8
|
|
DelegateMax uint8
|
|
DelegateCur uint8
|
|
}
|
|
|
|
// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort
|
|
// based on the Name, DC and Region
|
|
type AgentMembersNameSort []*AgentMember
|
|
|
|
func (a AgentMembersNameSort) Len() int { return len(a) }
|
|
func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a AgentMembersNameSort) Less(i, j int) bool {
|
|
if a[i].Tags["region"] != a[j].Tags["region"] {
|
|
return a[i].Tags["region"] < a[j].Tags["region"]
|
|
}
|
|
|
|
if a[i].Tags["dc"] != a[j].Tags["dc"] {
|
|
return a[i].Tags["dc"] < a[j].Tags["dc"]
|
|
}
|
|
|
|
return a[i].Name < a[j].Name
|
|
|
|
}
|
|
|
|
// AgentHealthResponse is the response from the Health endpoint describing an
|
|
// agent's health.
|
|
type AgentHealthResponse struct {
|
|
Client *AgentHealth `json:"client,omitempty"`
|
|
Server *AgentHealth `json:"server,omitempty"`
|
|
}
|
|
|
|
// AgentHealth describes the Client or Server's health in a Health request.
|
|
type AgentHealth struct {
|
|
// Ok is false if the agent is unhealthy
|
|
Ok bool `json:"ok"`
|
|
|
|
// Message describes why the agent is unhealthy
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
type HostData struct {
|
|
OS string
|
|
Network []map[string]string
|
|
ResolvConf string
|
|
Hosts string
|
|
Environment map[string]string
|
|
Disk map[string]DiskUsage
|
|
}
|
|
|
|
type DiskUsage struct {
|
|
DiskMB int64
|
|
UsedMB int64
|
|
}
|
|
|
|
type HostDataResponse struct {
|
|
AgentID string
|
|
HostData *HostData `json:",omitempty"`
|
|
}
|
|
|
|
// GetSchedulerWorkerConfig returns the targeted agent's worker pool configuration
|
|
func (a *Agent) GetSchedulerWorkerConfig(q *QueryOptions) (*SchedulerWorkerPoolArgs, error) {
|
|
var resp AgentSchedulerWorkerConfigResponse
|
|
_, err := a.client.query("/v1/agent/schedulers/config", &resp, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
|
|
}
|
|
|
|
// SetSchedulerWorkerConfig attempts to update the targeted agent's worker pool configuration
|
|
func (a *Agent) SetSchedulerWorkerConfig(args SchedulerWorkerPoolArgs, q *WriteOptions) (*SchedulerWorkerPoolArgs, error) {
|
|
req := AgentSchedulerWorkerConfigRequest(args)
|
|
var resp AgentSchedulerWorkerConfigResponse
|
|
|
|
_, err := a.client.put("/v1/agent/schedulers/config", &req, &resp, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
|
|
}
|
|
|
|
type SchedulerWorkerPoolArgs struct {
|
|
NumSchedulers int
|
|
EnabledSchedulers []string
|
|
}
|
|
|
|
// AgentSchedulerWorkerConfigRequest is used to provide new scheduler worker configuration
|
|
// to a specific Nomad server. EnabledSchedulers must contain at least the `_core` scheduler
|
|
// to be valid.
|
|
type AgentSchedulerWorkerConfigRequest struct {
|
|
NumSchedulers int `json:"num_schedulers"`
|
|
EnabledSchedulers []string `json:"enabled_schedulers"`
|
|
}
|
|
|
|
// AgentSchedulerWorkerConfigResponse contains the Nomad server's current running configuration
|
|
// as well as the server's id as a convenience. This can be used to provide starting values for
|
|
// creating an AgentSchedulerWorkerConfigRequest to make changes to the running configuration.
|
|
type AgentSchedulerWorkerConfigResponse struct {
|
|
ServerID string `json:"server_id"`
|
|
NumSchedulers int `json:"num_schedulers"`
|
|
EnabledSchedulers []string `json:"enabled_schedulers"`
|
|
}
|
|
|
|
// GetSchedulerWorkersInfo returns the current status of all of the scheduler workers on
|
|
// a Nomad server.
|
|
func (a *Agent) GetSchedulerWorkersInfo(q *QueryOptions) (*AgentSchedulerWorkersInfo, error) {
|
|
var out *AgentSchedulerWorkersInfo
|
|
|
|
_, err := a.client.query("/v1/agent/schedulers", &out, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// AgentSchedulerWorkersInfo is the response from the scheduler information endpoint containing
|
|
// a detailed status of each scheduler worker running on the server.
|
|
type AgentSchedulerWorkersInfo struct {
|
|
ServerID string `json:"server_id"`
|
|
Schedulers []AgentSchedulerWorkerInfo `json:"schedulers"`
|
|
}
|
|
|
|
// AgentSchedulerWorkerInfo holds the detailed status information for a single scheduler worker.
|
|
type AgentSchedulerWorkerInfo struct {
|
|
ID string `json:"id"`
|
|
EnabledSchedulers []string `json:"enabled_schedulers"`
|
|
Started string `json:"started"`
|
|
Status string `json:"status"`
|
|
WorkloadStatus string `json:"workload_status"`
|
|
}
|