package api import ( "encoding/json" "fmt" "net/url" ) // Agent encapsulates an API client which talks to Nomad's // agent endpoints for a specific node. type Agent struct { client *Client // Cache static agent info nodeName string datacenter string region string } // KeyringResponse is a unified key response and can be used for install, // remove, use, as well as listing key queries. type KeyringResponse struct { Messages map[string]string Keys map[string]int NumNodes int } // KeyringRequest is request objects for serf key operations. type KeyringRequest struct { Key string } // Agent returns a new agent which can be used to query // the agent-specific endpoints. func (c *Client) Agent() *Agent { return &Agent{client: c} } // Self is used to query the /v1/agent/self endpoint and // returns information specific to the running agent. func (a *Agent) Self() (*AgentSelf, error) { var out *AgentSelf // Query the self endpoint on the agent _, err := a.client.query("/v1/agent/self", &out, nil) if err != nil { return nil, fmt.Errorf("failed querying self endpoint: %s", err) } // Populate the cache for faster queries a.populateCache(out) return out, nil } // populateCache is used to insert various pieces of static // data into the agent handle. This is used during subsequent // lookups for the same data later on to save the round trip. func (a *Agent) populateCache(self *AgentSelf) { if a.nodeName == "" { a.nodeName = self.Member.Name } if a.datacenter == "" { if val, ok := self.Config["Datacenter"]; ok { a.datacenter, _ = val.(string) } } if a.region == "" { if val, ok := self.Config["Region"]; ok { a.region, _ = val.(string) } } } // NodeName is used to query the Nomad agent for its node name. func (a *Agent) NodeName() (string, error) { // Return from cache if we have it if a.nodeName != "" { return a.nodeName, nil } // Query the node name _, err := a.Self() return a.nodeName, err } // Datacenter is used to return the name of the datacenter which // the agent is a member of. func (a *Agent) Datacenter() (string, error) { // Return from cache if we have it if a.datacenter != "" { return a.datacenter, nil } // Query the agent for the DC _, err := a.Self() return a.datacenter, err } // Region is used to look up the region the agent is in. func (a *Agent) Region() (string, error) { // Return from cache if we have it if a.region != "" { return a.region, nil } // Query the agent for the region _, err := a.Self() return a.region, err } // Join is used to instruct a server node to join another server // via the gossip protocol. Multiple addresses may be specified. // We attempt to join all of the hosts in the list. Returns the // number of nodes successfully joined and any error. If one or // more nodes have a successful result, no error is returned. func (a *Agent) Join(addrs ...string) (int, error) { // Accumulate the addresses v := url.Values{} for _, addr := range addrs { v.Add("address", addr) } // Send the join request var resp joinResponse _, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil) if err != nil { return 0, fmt.Errorf("failed joining: %s", err) } if resp.Error != "" { return 0, fmt.Errorf("failed joining: %s", resp.Error) } return resp.NumJoined, nil } // Members is used to query all of the known server members func (a *Agent) Members() (*ServerMembers, error) { var resp *ServerMembers // Query the known members _, err := a.client.query("/v1/agent/members", &resp, nil) if err != nil { return nil, err } return resp, nil } // ForceLeave is used to eject an existing node from the cluster. func (a *Agent) ForceLeave(node string) error { _, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil) return err } // Servers is used to query the list of servers on a client node. func (a *Agent) Servers() ([]string, error) { var resp []string _, err := a.client.query("/v1/agent/servers", &resp, nil) if err != nil { return nil, err } return resp, nil } // SetServers is used to update the list of servers on a client node. func (a *Agent) SetServers(addrs []string) error { // Accumulate the addresses v := url.Values{} for _, addr := range addrs { v.Add("address", addr) } _, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil) return err } // ListKeys returns the list of installed keys func (a *Agent) ListKeys() (*KeyringResponse, error) { var resp KeyringResponse _, err := a.client.query("/v1/agent/keyring/list", &resp, nil) if err != nil { return nil, err } return &resp, nil } // InstallKey installs a key in the keyrings of all the serf members func (a *Agent) InstallKey(key string) (*KeyringResponse, error) { args := KeyringRequest{ Key: key, } var resp KeyringResponse _, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil) return &resp, err } // UseKey uses a key from the keyring of serf members func (a *Agent) UseKey(key string) (*KeyringResponse, error) { args := KeyringRequest{ Key: key, } var resp KeyringResponse _, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil) return &resp, err } // RemoveKey removes a particular key from keyrings of serf members func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) { args := KeyringRequest{ Key: key, } var resp KeyringResponse _, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil) return &resp, err } // Health queries the agent's health func (a *Agent) Health() (*AgentHealthResponse, error) { req, err := a.client.newRequest("GET", "/v1/agent/health") if err != nil { return nil, err } var health AgentHealthResponse _, resp, err := a.client.doRequest(req) if err != nil { return nil, err } defer resp.Body.Close() // Always try to decode the response as JSON err = json.NewDecoder(resp.Body).Decode(&health) if err == nil { return &health, nil } // Return custom error when response is not expected JSON format return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err) } // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop log streaming func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { errCh := make(chan error, 1) r, err := a.client.newRequest("GET", "/v1/agent/monitor") if err != nil { errCh <- err return nil, errCh } r.setQueryOptions(q) _, resp, err := requireOK(a.client.doRequest(r)) if err != nil { errCh <- err return nil, errCh } frames := make(chan *StreamFrame, 10) go func() { defer resp.Body.Close() dec := json.NewDecoder(resp.Body) for { select { case <-stopCh: return default: } // Decode the next frame var frame StreamFrame if err := dec.Decode(&frame); err != nil { close(frames) errCh <- err return } // Discard heartbeat frame if frame.IsHeartbeat() { continue } frames <- &frame } }() return frames, errCh } // joinResponse is used to decode the response we get while // sending a member join request. type joinResponse struct { NumJoined int `json:"num_joined"` Error string `json:"error"` } type ServerMembers struct { ServerName string ServerRegion string ServerDC string Members []*AgentMember } type AgentSelf struct { Config map[string]interface{} `json:"config"` Member AgentMember `json:"member"` Stats map[string]map[string]string `json:"stats"` } // AgentMember represents a cluster member known to the agent type AgentMember struct { Name string Addr string Port uint16 Tags map[string]string Status string ProtocolMin uint8 ProtocolMax uint8 ProtocolCur uint8 DelegateMin uint8 DelegateMax uint8 DelegateCur uint8 } // AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort // based on the Name, DC and Region type AgentMembersNameSort []*AgentMember func (a AgentMembersNameSort) Len() int { return len(a) } func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a AgentMembersNameSort) Less(i, j int) bool { if a[i].Tags["region"] != a[j].Tags["region"] { return a[i].Tags["region"] < a[j].Tags["region"] } if a[i].Tags["dc"] != a[j].Tags["dc"] { return a[i].Tags["dc"] < a[j].Tags["dc"] } return a[i].Name < a[j].Name } // AgentHealthResponse is the response from the Health endpoint describing an // agent's health. type AgentHealthResponse struct { Client *AgentHealth `json:"client,omitempty"` Server *AgentHealth `json:"server,omitempty"` } // AgentHealth describes the Client or Server's health in a Health request. type AgentHealth struct { // Ok is false if the agent is unhealthy Ok bool `json:"ok"` // Message describes why the agent is unhealthy Message string `json:"message"` }