314 lines
8 KiB
Go
314 lines
8 KiB
Go
package rabbithole
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/url"
|
|
)
|
|
|
|
// Information about backing queue (queue storage engine).
|
|
type BackingQueueStatus struct {
|
|
Q1 int `json:"q1"`
|
|
Q2 int `json:"q2"`
|
|
Q3 int `json:"q3"`
|
|
Q4 int `json:"q4"`
|
|
// Total queue length
|
|
Length int64 `json:"len"`
|
|
// Number of pending acks from consumers
|
|
PendingAcks int64 `json:"pending_acks"`
|
|
// Number of messages held in RAM
|
|
RAMMessageCount int64 `json:"ram_msg_count"`
|
|
// Number of outstanding acks held in RAM
|
|
RAMAckCount int64 `json:"ram_ack_count"`
|
|
// Number of persistent messages in the store
|
|
PersistentCount int64 `json:"persistent_count"`
|
|
// Average ingress (inbound) rate, not including messages
|
|
// that straight through to auto-acking consumers.
|
|
AverageIngressRate float64 `json:"avg_ingress_rate"`
|
|
// Average egress (outbound) rate, not including messages
|
|
// that straight through to auto-acking consumers.
|
|
AverageEgressRate float64 `json:"avg_egress_rate"`
|
|
// rate at which unacknowledged message records enter RAM,
|
|
// e.g. because messages are delivered requiring acknowledgement
|
|
AverageAckIngressRate float32 `json:"avg_ack_ingress_rate"`
|
|
// rate at which unacknowledged message records leave RAM,
|
|
// e.g. because acks arrive or unacked messages are paged out
|
|
AverageAckEgressRate float32 `json:"avg_ack_egress_rate"`
|
|
}
|
|
|
|
type OwnerPidDetails struct {
|
|
Name string `json:"name"`
|
|
PeerPort Port `json:"peer_port"`
|
|
PeerHost string `json:"peer_host"`
|
|
}
|
|
|
|
type QueueInfo struct {
|
|
// Queue name
|
|
Name string `json:"name"`
|
|
// Virtual host this queue belongs to
|
|
Vhost string `json:"vhost"`
|
|
// Is this queue durable?
|
|
Durable bool `json:"durable"`
|
|
// Is this queue auto-delted?
|
|
AutoDelete bool `json:"auto_delete"`
|
|
// Extra queue arguments
|
|
Arguments map[string]interface{} `json:"arguments"`
|
|
|
|
// RabbitMQ node that hosts master for this queue
|
|
Node string `json:"node"`
|
|
// Queue status
|
|
Status string `json:"status"`
|
|
|
|
// Total amount of RAM used by this queue
|
|
Memory int64 `json:"memory"`
|
|
// How many consumers this queue has
|
|
Consumers int `json:"consumers"`
|
|
// Utilisation of all the consumers
|
|
ConsumerUtilisation float64 `json:"consumer_utilisation"`
|
|
// If there is an exclusive consumer, its consumer tag
|
|
ExclusiveConsumerTag string `json:"exclusive_consumer_tag"`
|
|
|
|
// Policy applied to this queue, if any
|
|
Policy string `json:"policy"`
|
|
|
|
// Total bytes of messages in this queues
|
|
MessagesBytes int64 `json:"message_bytes"`
|
|
MessagesBytesPersistent int64 `json:"message_bytes_persistent"`
|
|
MessagesBytesRAM int64 `json:"message_bytes_ram"`
|
|
|
|
// Total number of messages in this queue
|
|
Messages int `json:"messages"`
|
|
MessagesDetails RateDetails `json:"messages_details"`
|
|
MessagesPersistent int `json:"messages_persistent"`
|
|
MessagesRAM int `json:"messages_ram"`
|
|
|
|
// Number of messages ready to be delivered
|
|
MessagesReady int `json:"messages_ready"`
|
|
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
|
|
|
|
// Number of messages delivered and pending acknowledgements from consumers
|
|
MessagesUnacknowledged int `json:"messages_unacknowledged"`
|
|
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
|
|
|
|
MessageStats MessageStats `json:"message_stats"`
|
|
|
|
OwnerPidDetails OwnerPidDetails `json:"owner_pid_details"`
|
|
|
|
BackingQueueStatus BackingQueueStatus `json:"backing_queue_status"`
|
|
}
|
|
|
|
type DetailedQueueInfo QueueInfo
|
|
|
|
//
|
|
// GET /api/queues
|
|
//
|
|
|
|
// [
|
|
// {
|
|
// "owner_pid_details": {
|
|
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672",
|
|
// "peer_port": 46928,
|
|
// "peer_host": "127.0.0.1"
|
|
// },
|
|
// "message_stats": {
|
|
// "publish": 19830,
|
|
// "publish_details": {
|
|
// "rate": 5
|
|
// }
|
|
// },
|
|
// "messages": 15,
|
|
// "messages_details": {
|
|
// "rate": 0
|
|
// },
|
|
// "messages_ready": 15,
|
|
// "messages_ready_details": {
|
|
// "rate": 0
|
|
// },
|
|
// "messages_unacknowledged": 0,
|
|
// "messages_unacknowledged_details": {
|
|
// "rate": 0
|
|
// },
|
|
// "policy": "",
|
|
// "exclusive_consumer_tag": "",
|
|
// "consumers": 0,
|
|
// "memory": 143112,
|
|
// "backing_queue_status": {
|
|
// "q1": 0,
|
|
// "q2": 0,
|
|
// "delta": [
|
|
// "delta",
|
|
// "undefined",
|
|
// 0,
|
|
// "undefined"
|
|
// ],
|
|
// "q3": 0,
|
|
// "q4": 15,
|
|
// "len": 15,
|
|
// "pending_acks": 0,
|
|
// "target_ram_count": "infinity",
|
|
// "ram_msg_count": 15,
|
|
// "ram_ack_count": 0,
|
|
// "next_seq_id": 19830,
|
|
// "persistent_count": 0,
|
|
// "avg_ingress_rate": 4.9920127795527,
|
|
// "avg_egress_rate": 4.9920127795527,
|
|
// "avg_ack_ingress_rate": 0,
|
|
// "avg_ack_egress_rate": 0
|
|
// },
|
|
// "status": "running",
|
|
// "name": "amq.gen-QLEaT5Rn_ogbN3O8ZOQt3Q",
|
|
// "vhost": "rabbit\/hole",
|
|
// "durable": false,
|
|
// "auto_delete": false,
|
|
// "arguments": {
|
|
// "x-message-ttl": 5000
|
|
// },
|
|
// "node": "rabbit@marzo"
|
|
// }
|
|
// ]
|
|
|
|
func (c *Client) ListQueues() (rec []QueueInfo, err error) {
|
|
req, err := newGETRequest(c, "queues")
|
|
if err != nil {
|
|
return []QueueInfo{}, err
|
|
}
|
|
|
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
|
return []QueueInfo{}, err
|
|
}
|
|
|
|
return rec, nil
|
|
}
|
|
|
|
func (c *Client) ListQueuesWithParameters(params url.Values) (rec []QueueInfo, err error) {
|
|
req, err := newGETRequestWithParameters(c, "queues", params)
|
|
if err != nil {
|
|
return []QueueInfo{}, err
|
|
}
|
|
|
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
|
return []QueueInfo{}, err
|
|
}
|
|
|
|
return rec, nil
|
|
}
|
|
|
|
//
|
|
// GET /api/queues/{vhost}
|
|
//
|
|
|
|
func (c *Client) ListQueuesIn(vhost string) (rec []QueueInfo, err error) {
|
|
req, err := newGETRequest(c, "queues/"+PathEscape(vhost))
|
|
if err != nil {
|
|
return []QueueInfo{}, err
|
|
}
|
|
|
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
|
return []QueueInfo{}, err
|
|
}
|
|
|
|
return rec, nil
|
|
}
|
|
|
|
//
|
|
// GET /api/queues/{vhost}/{name}
|
|
//
|
|
|
|
func (c *Client) GetQueue(vhost, queue string) (rec *DetailedQueueInfo, err error) {
|
|
req, err := newGETRequest(c, "queues/"+PathEscape(vhost)+"/"+PathEscape(queue))
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return rec, nil
|
|
}
|
|
|
|
//
|
|
// GET /api/queues/{vhost}/{name}?{query}
|
|
|
|
func (c *Client) GetQueueWithParameters(vhost, queue string, qs url.Values) (rec *DetailedQueueInfo, err error) {
|
|
req, err := newGETRequestWithParameters(c, "queues/"+PathEscape(vhost)+"/"+PathEscape(queue), qs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = executeAndParseRequest(c, req, &rec); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return rec, nil
|
|
}
|
|
|
|
//
|
|
// PUT /api/exchanges/{vhost}/{exchange}
|
|
//
|
|
|
|
type QueueSettings struct {
|
|
Durable bool `json:"durable"`
|
|
AutoDelete bool `json:"auto_delete"`
|
|
Arguments map[string]interface{} `json:"arguments"`
|
|
}
|
|
|
|
func (c *Client) DeclareQueue(vhost, queue string, info QueueSettings) (res *http.Response, err error) {
|
|
if info.Arguments == nil {
|
|
info.Arguments = make(map[string]interface{})
|
|
}
|
|
body, err := json.Marshal(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req, err := newRequestWithBody(c, "PUT", "queues/"+PathEscape(vhost)+"/"+PathEscape(queue), body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err = executeRequest(c, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
//
|
|
// DELETE /api/queues/{vhost}/{name}
|
|
//
|
|
|
|
func (c *Client) DeleteQueue(vhost, queue string) (res *http.Response, err error) {
|
|
req, err := newRequestWithBody(c, "DELETE", "queues/"+PathEscape(vhost)+"/"+PathEscape(queue), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err = executeRequest(c, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
//
|
|
// DELETE /api/queues/{vhost}/{name}/contents
|
|
//
|
|
|
|
func (c *Client) PurgeQueue(vhost, queue string) (res *http.Response, err error) {
|
|
req, err := newRequestWithBody(c, "DELETE", "queues/"+PathEscape(vhost)+"/"+PathEscape(queue)+"/contents", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err = executeRequest(c, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res, nil
|
|
}
|