package rabbithole import ( "encoding/json" "net/http" "net/url" ) // BackingQueueStatus exposes backing queue (queue storage engine) metrics. // They can change in a future version of RabbitMQ. type BackingQueueStatus struct { Q1 int `json:"q1"` Q2 int `json:"q2"` Q3 int `json:"q3"` Q4 int `json:"q4"` // Total queue length Length int64 `json:"len"` // Number of pending acks from consumers PendingAcks int64 `json:"pending_acks"` // Number of messages held in RAM RAMMessageCount int64 `json:"ram_msg_count"` // Number of outstanding acks held in RAM RAMAckCount int64 `json:"ram_ack_count"` // Number of persistent messages in the store PersistentCount int64 `json:"persistent_count"` // Average ingress (inbound) rate, not including messages // that straight through to auto-acking consumers. AverageIngressRate float64 `json:"avg_ingress_rate"` // Average egress (outbound) rate, not including messages // that straight through to auto-acking consumers. AverageEgressRate float64 `json:"avg_egress_rate"` // rate at which unacknowledged message records enter RAM, // e.g. because messages are delivered requiring acknowledgement AverageAckIngressRate float32 `json:"avg_ack_ingress_rate"` // rate at which unacknowledged message records leave RAM, // e.g. because acks arrive or unacked messages are paged out AverageAckEgressRate float32 `json:"avg_ack_egress_rate"` } // OwnerPidDetails describes an exclusive queue owner (connection). type OwnerPidDetails struct { Name string `json:"name"` PeerPort Port `json:"peer_port"` PeerHost string `json:"peer_host"` } // QueueInfo represents a queue, its properties and key metrics. type QueueInfo struct { // Queue name Name string `json:"name"` // Virtual host this queue belongs to Vhost string `json:"vhost"` // Is this queue durable? Durable bool `json:"durable"` // Is this queue auto-delted? AutoDelete bool `json:"auto_delete"` // Extra queue arguments Arguments map[string]interface{} `json:"arguments"` // RabbitMQ node that hosts master for this queue Node string `json:"node"` // Queue status Status string `json:"state"` // Total amount of RAM used by this queue Memory int64 `json:"memory"` // How many consumers this queue has Consumers int `json:"consumers"` // Utilisation of all the consumers ConsumerUtilisation float64 `json:"consumer_utilisation"` // If there is an exclusive consumer, its consumer tag ExclusiveConsumerTag string `json:"exclusive_consumer_tag"` // Policy applied to this queue, if any Policy string `json:"policy"` // Total bytes of messages in this queues MessagesBytes int64 `json:"message_bytes"` MessagesBytesPersistent int64 `json:"message_bytes_persistent"` MessagesBytesRAM int64 `json:"message_bytes_ram"` // Total number of messages in this queue Messages int `json:"messages"` MessagesDetails RateDetails `json:"messages_details"` MessagesPersistent int `json:"messages_persistent"` MessagesRAM int `json:"messages_ram"` // Number of messages ready to be delivered MessagesReady int `json:"messages_ready"` MessagesReadyDetails RateDetails `json:"messages_ready_details"` // Number of messages delivered and pending acknowledgements from consumers MessagesUnacknowledged int `json:"messages_unacknowledged"` MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"` MessageStats MessageStats `json:"message_stats"` OwnerPidDetails OwnerPidDetails `json:"owner_pid_details"` BackingQueueStatus BackingQueueStatus `json:"backing_queue_status"` ActiveConsumers int64 `json:"active_consumers"` } // PagedQueueInfo is additional context returned for paginated requests. type PagedQueueInfo struct { Page int `json:"page"` PageCount int `json:"page_count"` PageSize int `json:"page_size"` FilteredCount int `json:"filtered_count"` ItemCount int `json:"item_count"` TotalCount int `json:"total_count"` Items []QueueInfo `json:"items"` } // DetailedQueueInfo is an alias for QueueInfo type DetailedQueueInfo QueueInfo // // GET /api/queues // // [ // { // "owner_pid_details": { // "name": "127.0.0.1:46928 -> 127.0.0.1:5672", // "peer_port": 46928, // "peer_host": "127.0.0.1" // }, // "message_stats": { // "publish": 19830, // "publish_details": { // "rate": 5 // } // }, // "messages": 15, // "messages_details": { // "rate": 0 // }, // "messages_ready": 15, // "messages_ready_details": { // "rate": 0 // }, // "messages_unacknowledged": 0, // "messages_unacknowledged_details": { // "rate": 0 // }, // "policy": "", // "exclusive_consumer_tag": "", // "consumers": 0, // "memory": 143112, // "backing_queue_status": { // "q1": 0, // "q2": 0, // "delta": [ // "delta", // "undefined", // 0, // "undefined" // ], // "q3": 0, // "q4": 15, // "len": 15, // "pending_acks": 0, // "target_ram_count": "infinity", // "ram_msg_count": 15, // "ram_ack_count": 0, // "next_seq_id": 19830, // "persistent_count": 0, // "avg_ingress_rate": 4.9920127795527, // "avg_egress_rate": 4.9920127795527, // "avg_ack_ingress_rate": 0, // "avg_ack_egress_rate": 0 // }, // "status": "running", // "name": "amq.gen-QLEaT5Rn_ogbN3O8ZOQt3Q", // "vhost": "rabbit\/hole", // "durable": false, // "auto_delete": false, // "arguments": { // "x-message-ttl": 5000 // }, // "node": "rabbit@marzo" // } // ] // ListQueues lists all queues in the cluster. This only includes queues in the // virtual hosts accessible to the user. func (c *Client) ListQueues() (rec []QueueInfo, err error) { req, err := newGETRequest(c, "queues") if err != nil { return []QueueInfo{}, err } if err = executeAndParseRequest(c, req, &rec); err != nil { return []QueueInfo{}, err } return rec, nil } // ListQueuesWithParameters lists queues with a list of query string values. func (c *Client) ListQueuesWithParameters(params url.Values) (rec []QueueInfo, err error) { req, err := newGETRequestWithParameters(c, "queues", params) if err != nil { return []QueueInfo{}, err } if err = executeAndParseRequest(c, req, &rec); err != nil { return []QueueInfo{}, err } return rec, nil } // PagedListQueuesWithParameters lists queues with pagination. func (c *Client) PagedListQueuesWithParameters(params url.Values) (rec PagedQueueInfo, err error) { req, err := newGETRequestWithParameters(c, "queues", params) if err != nil { return PagedQueueInfo{}, err } if err = executeAndParseRequest(c, req, &rec); err != nil { return PagedQueueInfo{}, err } return rec, nil } // // GET /api/queues/{vhost} // // ListQueuesIn lists all queues in a virtual host. func (c *Client) ListQueuesIn(vhost string) (rec []QueueInfo, err error) { req, err := newGETRequest(c, "queues/"+url.PathEscape(vhost)) if err != nil { return []QueueInfo{}, err } if err = executeAndParseRequest(c, req, &rec); err != nil { return []QueueInfo{}, err } return rec, nil } // // GET /api/queues/{vhost}/{name} // // GetQueue returns information about a queue. func (c *Client) GetQueue(vhost, queue string) (rec *DetailedQueueInfo, err error) { req, err := newGETRequest(c, "queues/"+url.PathEscape(vhost)+"/"+url.PathEscape(queue)) if err != nil { return nil, err } if err = executeAndParseRequest(c, req, &rec); err != nil { return nil, err } return rec, nil } // // GET /api/queues/{vhost}/{name}?{query} // GetQueueWithParameters returns information about a queue. Compared to the regular GetQueue function, // this one accepts additional query string values. func (c *Client) GetQueueWithParameters(vhost, queue string, qs url.Values) (rec *DetailedQueueInfo, err error) { req, err := newGETRequestWithParameters(c, "queues/"+url.PathEscape(vhost)+"/"+url.PathEscape(queue), qs) if err != nil { return nil, err } if err = executeAndParseRequest(c, req, &rec); err != nil { return nil, err } return rec, nil } // // PUT /api/exchanges/{vhost}/{exchange} // // QueueSettings represents queue properties. Use it to declare a queue. type QueueSettings struct { Type string `json:"type"` Durable bool `json:"durable"` AutoDelete bool `json:"auto_delete,omitempty"` Arguments map[string]interface{} `json:"arguments,omitempty"` } // DeclareQueue declares a queue. func (c *Client) DeclareQueue(vhost, queue string, info QueueSettings) (res *http.Response, err error) { if info.Arguments == nil { info.Arguments = make(map[string]interface{}) } body, err := json.Marshal(info) if err != nil { return nil, err } req, err := newRequestWithBody(c, "PUT", "queues/"+url.PathEscape(vhost)+"/"+url.PathEscape(queue), body) if err != nil { return nil, err } if res, err = executeRequest(c, req); err != nil { return nil, err } return res, nil } // // DELETE /api/queues/{vhost}/{name} // // DeleteQueue deletes a queue. func (c *Client) DeleteQueue(vhost, queue string) (res *http.Response, err error) { req, err := newRequestWithBody(c, "DELETE", "queues/"+url.PathEscape(vhost)+"/"+url.PathEscape(queue), nil) if err != nil { return nil, err } if res, err = executeRequest(c, req); err != nil { return nil, err } return res, nil } // // DELETE /api/queues/{vhost}/{name}/contents // // PurgeQueue purges a queue (deletes all messages ready for delivery in it). func (c *Client) PurgeQueue(vhost, queue string) (res *http.Response, err error) { req, err := newRequestWithBody(c, "DELETE", "queues/"+url.PathEscape(vhost)+"/"+url.PathEscape(queue)+"/contents", nil) if err != nil { return nil, err } if res, err = executeRequest(c, req); err != nil { return nil, err } return res, nil } // queueAction represents an action that can be performed on a queue (sync/cancel_sync) type queueAction struct { Action string `json:"action"` } // SyncQueue synchronises queue contents with the mirrors remaining in the cluster. func (c *Client) SyncQueue(vhost, queue string) (res *http.Response, err error) { return c.sendQueueAction(vhost, queue, queueAction{"sync"}) } // CancelSyncQueue cancels queue synchronisation process. func (c *Client) CancelSyncQueue(vhost, queue string) (res *http.Response, err error) { return c.sendQueueAction(vhost, queue, queueAction{"cancel_sync"}) } // // POST /api/queues/{vhost}/{name}/actions // func (c *Client) sendQueueAction(vhost string, queue string, action queueAction) (res *http.Response, err error) { body, err := json.Marshal(action) if err != nil { return nil, err } req, err := newRequestWithBody(c, "POST", "queues/"+url.PathEscape(vhost)+"/"+url.PathEscape(queue)+"/actions", body) if err != nil { return nil, err } if res, err = executeRequest(c, req); err != nil { return nil, err } return res, nil }