add rabbit-hole dep

This commit is contained in:
Kevin Pike 2015-11-18 21:45:05 -08:00
parent 34dcbe176e
commit aa15145918
22 changed files with 2600 additions and 0 deletions

View File

@ -0,0 +1,24 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
src/github.com
src/code.google.com

View File

@ -0,0 +1,17 @@
language: go
go:
- 1.3
- 1.4
- tip
services:
- rabbitmq
before_script: ./bin/ci/before_build.sh
script: make
matrix:
allow_failures:
- go: tip
notifications:
email:
- michael@rabbitmq.com

View File

@ -0,0 +1,15 @@
## Will be in 1.0 (First Public Release)
### TLS Support
`rabbithole.NewTLSClient` is a new function which works
much like `NewClient` but additionally accepts a transport.
Contributed by @GrimTheReaper.
### Federation Support
It is now possible to create federation links
over HTTP API.
Contributed by Ryan Grenz.

View File

@ -0,0 +1,23 @@
Copyright (c) 2013, Michael Klishin
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,24 @@
export GOPATH := $(CURDIR)
COVER_FILE := coverage
all: test
.PHONY: test
test: install-dependencies
go test -v
cover: install-dependencies install-cover
go test -v -test.coverprofile="$(COVER_FILE).prof"
sed -i.bak 's|_'$(GOPATH)'|.|g' $(COVER_FILE).prof
go tool cover -html=$(COVER_FILE).prof -o $(COVER_FILE).html
rm $(COVER_FILE).prof*
install-cover:
go get code.google.com/p/go.tools/cmd/cover
install-dependencies:
go get github.com/onsi/ginkgo
go get github.com/onsi/gomega
go get github.com/streadway/amqp

View File

@ -0,0 +1,293 @@
# Rabbit Hole, a RabbitMQ HTTP API Client for Go
This library is a [RabbitMQ HTTP API](http://hg.rabbitmq.com/rabbitmq-management/raw-file/450b7ea22cfd/priv/www/api/index.html) client for the Go language.
## Supported Go Versions
Rabbit Hole requires Go 1.1+.
## Supported RabbitMQ Versions
* RabbitMQ 3.x
* RabbitMQ 2.x
All versions require [RabbitMQ Management UI plugin](http://www.rabbitmq.com/management.html) to be installed and enabled.
## Project Maturity
Rabbit Hole is a young project designed after a couple of other RabbitMQ HTTP API clients with stable APIs.
However, breaking API changes are not out of the question.
It is largely (80-90%) feature complete and decently documented.
## Installation
```
go get github.com/michaelklishin/rabbit-hole
```
## Documentation
### Overview
To import the package:
``` go
import (
"github.com/michaelklishin/rabbit-hole"
)
```
All HTTP API operations are accessible via `rabbithole.Client`, which
should be instantiated with `rabbithole.NewClient`:
``` go
// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
```
SSL/TSL is now available, by adding a Transport Layer to the parameters
of `rabbithole.NewTLSClient`:
``` go
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, _ := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
```
However, RabbitMQ-Management does not have SSL/TLS enabled by default,
so you must enable it.
[API reference](http://godoc.org/github.com/michaelklishin/rabbit-hole) is available on [godoc.org](http://godoc.org).
### Getting Overview
``` go
res, err := rmqc.Overview()
```
### Node and Cluster Status
``` go
xs, err := rmqc.ListNodes()
// => []NodeInfo, err
node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
```
### Operations on Connections
``` go
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err
// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
```
### Operations on Channels
``` go
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
```
### Operations on Vhosts
``` go
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err
// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err
// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err
// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
```
### Managing Users
``` go
xs, err := rmqc.ListUsers()
// => []UserInfo, err
// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err
// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management,policymaker"})
// => *http.Response, err
// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
```
### Managing Permissions
``` go
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err
// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err
// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err
// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err
// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
```
### Operations on Exchanges
``` go
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err
// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err
// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err
// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err
// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
```
### Operations on Queues
``` go
qs, err := rmqc.ListQueues()
// => []QueueInfo, err
// list queues in a vhost
qs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err
// information about individual queue
q, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err
// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err
// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err
```
### Operations on Bindings
``` go
bs, err := rmqc.ListBindings()
// => []BindingInfo, err
// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err
// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err
// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
})
// => *http.Response, err
// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
PropertiesKey: "%23",
})
// => *http.Response, err
```
### HTTPS Connections
``` go
var tlsConfig *tls.Config
...
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, err := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
```
### Changing Transport Layer
``` go
var transport *http.Transport
...
rmqc.SetTransport(transport)
```
## Contributing
1. Fork it
2. Create your feature branch (`git checkout -b my-new-feature`)
3. Commit your changes (`git commit -am 'Add some feature'`)
4. Push to the branch (`git push -u origin my-new-feature`)
5. Create new Pull Request
## License & Copyright
2-clause BSD license.
(c) Michael S. Klishin, 2013-2014.
[![Bitdeli Badge](https://d2weczhvl823v0.cloudfront.net/michaelklishin/rabbit-hole/trend.png)](https://bitdeli.com/free "Bitdeli Badge")

View File

@ -0,0 +1,157 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/bindings
//
// Example response:
//
// [
// {
// "source": "",
// "vhost": "\/",
// "destination": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
// "destination_type": "queue",
// "routing_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
// "arguments": {
//
// },
// "properties_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w"
// }
// ]
type BindingInfo struct {
// Binding source (exchange name)
Source string `json:"source"`
Vhost string `json:"vhost"`
// Binding destination (queue or exchange name)
Destination string `json:"destination"`
// Destination type, either "queue" or "exchange"
DestinationType string `json:"destination_type"`
RoutingKey string `json:"routing_key"`
Arguments map[string]interface{} `json:"arguments"`
PropertiesKey string `json:"properties_key"`
}
// Returns all bindings
func (c *Client) ListBindings() (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "bindings/")
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// GET /api/bindings/{vhost}
//
// Returns all bindings in a virtual host.
func (c *Client) ListBindingsIn(vhost string) (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "bindings/"+url.QueryEscape(vhost))
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}/{queue}/bindings
//
// Example response:
// [
// {"source":"",
// "vhost":"/",
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "destination_type":"queue",
// "routing_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "arguments":{},
// "properties_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA"},
// {"source":"temp",
// "vhost":"/",
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "destination_type":"queue",
// "routing_key":"",
// "arguments":{},
// "properties_key":"~"}
// ]
// Returns all bindings of individual queue.
func (c *Client) ListQueueBindings(vhost, queue string) (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue)+"/bindings")
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// POST /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}
//
// DeclareBinding updates information about a binding between a source and a target
func (c *Client) DeclareBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
info.Vhost = vhost
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "POST", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}/{props}
//
// DeleteBinding delets an individual binding
func (c *Client) DeleteBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination)+"/"+url.QueryEscape(info.PropertiesKey), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,86 @@
package rabbithole
import "net/url"
// Brief (very incomplete) connection information.
type BriefConnectionDetails struct {
// Connection name
Name string `json:"name"`
// Client port
PeerPort Port `json:"peer_port"`
// Client host
PeerHost string `json:"peer_host"`
}
type ChannelInfo struct {
// Channel number
Number int `json:"number"`
// Channel name
Name string `json:"name"`
// basic.qos (prefetch count) value used
PrefetchCount int `json:"prefetch_count"`
// How many consumers does this channel have
ConsumerCount int `json:"consumer_count"`
// Number of unacknowledged messages on this channel
UnacknowledgedMessageCount int `json:"messages_unacknowledged"`
// Number of messages on this channel unconfirmed to publishers
UnconfirmedMessageCount int `json:"messages_unconfirmed"`
// Number of messages on this channel uncommited to message store
UncommittedMessageCount int `json:"messages_uncommitted"`
// Number of acks on this channel uncommited to message store
UncommittedAckCount int `json:"acks_uncommitted"`
// TODO(mk): custom deserializer to date/time?
IdleSince string `json:"idle_since"`
// True if this channel uses publisher confirms
UsesPublisherConfirms bool `json:"confirm"`
// True if this channel uses transactions
Transactional bool `json:"transactional"`
// True if this channel is blocked via channel.flow
ClientFlowBlocked bool `json:"client_flow_blocked"`
User string `json:"user"`
Vhost string `json:"vhost"`
Node string `json:"node"`
ConnectionDetails BriefConnectionDetails `json:"connection_details"`
}
//
// GET /api/channels
//
// Returns information about all open channels.
func (c *Client) ListChannels() (rec []ChannelInfo, err error) {
req, err := newGETRequest(c, "channels")
if err != nil {
return []ChannelInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ChannelInfo{}, err
}
return rec, nil
}
//
// GET /api/channels/{name}
//
// Returns channel information.
func (c *Client) GetChannel(name string) (rec *ChannelInfo, err error) {
req, err := newGETRequest(c, "channels/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

View File

@ -0,0 +1,128 @@
package rabbithole
import (
"bytes"
"encoding/json"
"errors"
"net/http"
"net/url"
)
type Client struct {
// URI of a RabbitMQ node to use, not including the path, e.g. http://127.0.0.1:15672.
Endpoint string
// Username to use. This RabbitMQ user must have the "management" tag.
Username string
// Password to use.
Password string
host string
transport *http.Transport
}
func NewClient(uri string, username string, password string) (me *Client, err error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
me = &Client{
Endpoint: uri,
host: u.Host,
Username: username,
Password: password,
}
return me, nil
}
//NewTLSClient Creates a Client with a Transport Layer; it is up to the developer to make that layer Secure.
func NewTLSClient(uri string, username string, password string, transport *http.Transport) (me *Client, err error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
me = &Client{
Endpoint: uri,
host: u.Host,
Username: username,
Password: password,
transport: transport,
}
return me, nil
}
//SetTransport changes the Transport Layer that the Client will use.
func (c *Client) SetTransport(transport *http.Transport) {
c.transport = transport
}
func newGETRequest(client *Client, path string) (*http.Request, error) {
s := client.Endpoint + "/api/" + path
req, err := http.NewRequest("GET", s, nil)
req.SetBasicAuth(client.Username, client.Password)
// set Opaque to preserve percent-encoded path. MK.
req.URL.Opaque = "//" + client.host + "/api/" + path
return req, err
}
func newRequestWithBody(client *Client, method string, path string, body []byte) (*http.Request, error) {
s := client.Endpoint + "/api/" + path
req, err := http.NewRequest(method, s, bytes.NewReader(body))
req.SetBasicAuth(client.Username, client.Password)
// set Opaque to preserve percent-encoded path. MK.
req.URL.Opaque = "//" + client.host + "/api/" + path
req.Header.Add("Content-Type", "application/json")
return req, err
}
func executeRequest(client *Client, req *http.Request) (res *http.Response, err error) {
var httpc *http.Client
if client.transport != nil {
httpc = &http.Client{Transport: client.transport}
} else {
httpc = &http.Client{}
}
res, err = httpc.Do(req)
if err != nil {
return nil, err
}
return res, nil
}
func executeAndParseRequest(client *Client, req *http.Request, rec interface{}) (err error) {
var httpc *http.Client
if client.transport != nil {
httpc = &http.Client{Transport: client.transport}
} else {
httpc = &http.Client{}
}
res, err := httpc.Do(req)
if err != nil {
return err
}
if isNotFound(res) {
return errors.New("not found")
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(&rec)
if err != nil {
return err
}
return nil
}
func isNotFound(res *http.Response) bool {
return res.StatusCode == http.StatusNotFound
}

View File

@ -0,0 +1,45 @@
package rabbithole
import "strconv"
// Extra arguments as a map (on queues, bindings, etc)
type Properties map[string]interface{}
// Port used by RabbitMQ or clients
type Port int
func (p *Port) UnmarshalJSON(b []byte) error {
stringValue := string(b)
var parsed int64
var err error
if stringValue[0] == '"' && stringValue[len(stringValue)-1] == '"' {
parsed, err = strconv.ParseInt(stringValue[1:len(stringValue)-1], 10, 32)
} else {
parsed, err = strconv.ParseInt(stringValue, 10, 32)
}
if err == nil {
*p = Port(int(parsed))
}
return err
}
// Rate of change of a numerical value
type RateDetails struct {
Rate float32 `json:"rate"`
}
// RabbitMQ context (Erlang app) running on
// a node
type BrokerContext struct {
Node string `json:"node"`
Description string `json:"description"`
Path string `json:"path"`
Port Port `json:"port"`
Ignore bool `json:"ignore_in_use"`
}
// Basic published messages statistics
type MessageStats struct {
Publish int `json:"publish"`
PublishDetails RateDetails `json:"publish_details"`
}

View File

@ -0,0 +1,131 @@
package rabbithole
import (
"net/http"
"net/url"
)
// Provides information about connection to a RabbitMQ node.
type ConnectionInfo struct {
// Connection name
Name string `json:"name"`
// Node the client is connected to
Node string `json:"node"`
// Number of open channels
Channels int `json:"channels"`
// Connection state
State string `json:"state"`
// Connection type, network (via AMQP client) or direct (via direct Erlang client)
Type string `json:"type"`
// Server port
Port Port `json:"port"`
// Client port
PeerPort Port `json:"peer_port"`
// Server host
Host string `json:"host"`
// Client host
PeerHost string `json:"peer_host"`
// Last connection blocking reason, if any
LastBlockedBy string `json:"last_blocked_by"`
// When connection was last blocked
LastBlockedAge string `json:"last_blocked_age"`
// True if connection uses TLS/SSL
UsesTLS bool `json:"ssl"`
// Client certificate subject
PeerCertSubject string `json:"peer_cert_subject"`
// Client certificate validity
PeerCertValidity string `json:"peer_cert_validity"`
// Client certificate issuer
PeerCertIssuer string `json:"peer_cert_issuer"`
// TLS/SSL protocol and version
SSLProtocol string `json:"ssl_protocol"`
// Key exchange mechanism
SSLKeyExchange string `json:"ssl_key_exchange"`
// SSL cipher suite used
SSLCipher string `json:"ssl_cipher"`
// SSL hash
SSLHash string `json:"ssl_hash"`
// Protocol, e.g. AMQP 0-9-1 or MQTT 3-1
Protocol string `json:"protocol"`
User string `json:"user"`
// Virtual host
Vhost string `json:"vhost"`
// Heartbeat timeout
Timeout int `json:"timeout"`
// Maximum frame size (AMQP 0-9-1)
FrameMax int `json:"frame_max"`
// A map of client properties (name, version, capabilities, etc)
ClientProperties Properties `json:"client_properties"`
// Octets received
RecvOct uint64 `json:"recv_oct"`
// Octets sent
SendOct uint64 `json:"send_oct"`
RecvCount uint64 `json:"recv_cnt"`
SendCount uint64 `json:"send_cnt"`
SendPending uint64 `json:"send_pend"`
// Ingress data rate
RecvOctDetails RateDetails `json:"recv_oct_details"`
// Egress data rate
SendOctDetails RateDetails `json:"send_oct_details"`
}
//
// GET /api/connections
//
func (c *Client) ListConnections() (rec []ConnectionInfo, err error) {
req, err := newGETRequest(c, "connections")
if err != nil {
return []ConnectionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ConnectionInfo{}, err
}
return rec, nil
}
//
// GET /api/connections/{name}
//
func (c *Client) GetConnection(name string) (rec *ConnectionInfo, err error) {
req, err := newGETRequest(c, "connections/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// DELETE /api/connections/{name}
//
func (c *Client) CloseConnection(name string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "connections/"+url.QueryEscape(name), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,173 @@
/*
Rabbit Hole is a Go client for the RabbitMQ HTTP API.
All HTTP API operations are accessible via `rabbithole.Client`, which
should be instantiated with `rabbithole.NewClient`.
// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
Getting Overview
res, err := rmqc.Overview()
Node and Cluster Status
var err error
// => []NodeInfo, err
xs, err := rmqc.ListNodes()
node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
Operations on Connections
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err
// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
Operations on Channels
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
Operations on Exchanges
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err
// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err
// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err
// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err
// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
Operations on Queues
xs, err := rmqc.ListQueues()
// => []QueueInfo, err
// list queues in a vhost
xs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err
// information about individual queue
x, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err
// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err
// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err
Operations on Bindings
bs, err := rmqc.ListBindings()
// => []BindingInfo, err
// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err
// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err
// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
})
// => *http.Response, err
// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
PropertiesKey: "%23",
})
// => *http.Response, err
Operations on Vhosts
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err
// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err
// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err
// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
Managing Users
xs, err := rmqc.ListUsers()
// => []UserInfo, err
// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err
// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management policymaker"})
// => *http.Response, err
// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
Managing Permissions
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err
// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err
// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err
// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err
// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
*/
package rabbithole

View File

@ -0,0 +1,219 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/exchanges
//
type IngressEgressStats struct {
PublishIn int `json:"publish_in"`
PublishInDetails RateDetails `json:"publish_in_details"`
PublishOut int `json:"publish_out"`
PublishOutDetails RateDetails `json:"publish_out_details"`
}
type ExchangeInfo struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments"`
MessageStats IngressEgressStats `json:"message_stats"`
}
type ExchangeSettings struct {
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Arguments map[string]interface{} `json:"arguments"`
}
func (c *Client) ListExchanges() (rec []ExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges")
if err != nil {
return []ExchangeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ExchangeInfo{}, err
}
return rec, nil
}
//
// GET /api/exchanges/{vhost}
//
func (c *Client) ListExchangesIn(vhost string) (rec []ExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost))
if err != nil {
return []ExchangeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ExchangeInfo{}, err
}
return rec, nil
}
//
// GET /api/exchanges/{vhost}/{name}
//
// Example response:
//
// {
// "incoming": [
// {
// "stats": {
// "publish": 2760,
// "publish_details": {
// "rate": 20
// }
// },
// "channel_details": {
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672 (2)",
// "number": 2,
// "connection_name": "127.0.0.1:46928 -> 127.0.0.1:5672",
// "peer_port": 46928,
// "peer_host": "127.0.0.1"
// }
// }
// ],
// "outgoing": [
// {
// "stats": {
// "publish": 1280,
// "publish_details": {
// "rate": 20
// }
// },
// "queue": {
// "name": "amq.gen-7NhO_yRr4lDdp-8hdnvfuw",
// "vhost": "rabbit\/hole"
// }
// }
// ],
// "message_stats": {
// "publish_in": 2760,
// "publish_in_details": {
// "rate": 20
// },
// "publish_out": 1280,
// "publish_out_details": {
// "rate": 20
// }
// },
// "name": "amq.fanout",
// "vhost": "rabbit\/hole",
// "type": "fanout",
// "durable": true,
// "auto_delete": false,
// "internal": false,
// "arguments": {
// }
// }
type ExchangeIngressDetails struct {
Stats MessageStats `json:"stats"`
ChannelDetails PublishingChannel `json:"channel_details"`
}
type PublishingChannel struct {
Number int `json:"number"`
Name string `json:"name"`
ConnectionName string `json:"connection_name"`
PeerPort Port `json:"peer_port"`
PeerHost string `json:"peer_host"`
}
type NameAndVhost struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
}
type ExchangeEgressDetails struct {
Stats MessageStats `json:"stats"`
Queue NameAndVhost `json:"queue"`
}
type DetailedExchangeInfo struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments"`
Incoming []ExchangeIngressDetails `json:"incoming"`
Outgoing []ExchangeEgressDetails `json:"outgoing"`
}
func (c *Client) GetExchange(vhost, exchange string) (rec *DetailedExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost)+"/"+exchange)
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/exchanges/{vhost}/{exchange}
//
func (c *Client) DeclareExchange(vhost, exchange string, info ExchangeSettings) (res *http.Response, err error) {
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/exchanges/{vhost}/{name}
//
func (c *Client) DeleteExchange(vhost, exchange string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,72 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Federation definition: additional arguments
// added to the entities (queues, exchanges or both)
// that match a policy.
type FederationDefinition struct {
Uri string `json:"uri"`
Expires int `json:"expires"`
MessageTTL int32 `json:"message-ttl"`
MaxHops int `json:"max-hops"`
PrefetchCount int `json:"prefetch-count"`
ReconnectDelay int `json:"reconnect-delay"`
AckMode string `json:"ack-mode"`
TrustUserId bool `json:"trust-user-id"`
}
// Represents a configured Federation upstream.
type FederationUpstream struct {
Definition FederationDefinition `json:"value"`
}
//
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
//
// Updates a federation upstream
func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) {
fedUp := FederationUpstream{
Definition: fDef,
}
body, err := json.Marshal(fedUp)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/parameters/federation-upstream/{vhost}/{name}
//
// Deletes a federation upstream.
func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,83 @@
package rabbithole
//
// GET /api/overview
//
type QueueTotals struct {
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
}
type ObjectTotals struct {
Consumers int `json:"consumers"`
Queues int `json:"queues"`
Exchanges int `json:"exchanges"`
Connections int `json:"connections"`
Channels int `json:"channels"`
}
type Listener struct {
Node string `json:"node"`
Protocol string `json:"protocol"`
IpAddress string `json:"ip_address"`
Port Port `json:"port"`
}
type Overview struct {
ManagementVersion string `json:"management_version"`
StatisticsLevel string `json:"statistics_level"`
RabbitMQVersion string `json:"rabbitmq_version"`
ErlangVersion string `json:"erlang_version"`
FullErlangVersion string `json:"erlang_full_version"`
ExchangeTypes []ExchangeType `json:"exchange_types"`
MessageStats MessageStats `json:"message_stats"`
QueueTotals QueueTotals `json:"queue_totals"`
ObjectTotals ObjectTotals `json:"object_totals"`
Node string `json:"node"`
StatisticsDBNode string `json:"statistics_db_node"`
Listeners []Listener `json:"listeners"`
Contexts []BrokerContext `json:"contexts"`
}
func (c *Client) Overview() (rec *Overview, err error) {
req, err := newGETRequest(c, "overview")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/whoami
//
type WhoamiInfo struct {
Name string `json:"name"`
Tags string `json:"tags"`
AuthBackend string `json:"auth_backend"`
}
func (c *Client) Whoami() (rec *WhoamiInfo, err error) {
req, err := newGETRequest(c, "whoami")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

View File

@ -0,0 +1,301 @@
package rabbithole
import (
"net/url"
)
// TODO: this probably should be fixed in RabbitMQ management plugin
type OsPid string
type NameDescriptionEnabled struct {
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
}
type AuthMechanism NameDescriptionEnabled
type ExchangeType NameDescriptionEnabled
type NameDescriptionVersion struct {
Name string `json:"name"`
Description string `json:"description"`
Version string `json:"version"`
}
type ErlangApp NameDescriptionVersion
type NodeInfo struct {
Name string `json:"name"`
NodeType string `json:"type"`
IsRunning bool `json:"running"`
OsPid OsPid `json:"os_pid"`
FdUsed int `json:"fd_used"`
FdTotal int `json:"fd_total"`
SocketsUsed int `json:"sockets_used"`
SocketsTotal int `json:"sockets_total"`
MemUsed int `json:"mem_used"`
MemLimit int `json:"mem_limit"`
MemAlarm bool `json:"mem_alarm"`
DiskFree int `json:"disk_free"`
DiskFreeLimit int `json:"disk_free_limit"`
DiskFreeAlarm bool `json:"disk_free_alarm"`
// Erlang scheduler run queue length
RunQueueLength uint32 `json:"run_queue"`
Processors uint32 `json:"processors"`
Uptime uint64 `json:"uptime"`
ExchangeTypes []ExchangeType `json:"exchange_types"`
AuthMechanisms []AuthMechanism `json:"auth_mechanisms"`
ErlangApps []ErlangApp `json:"applications"`
Contexts []BrokerContext `json:"contexts"`
}
//
// GET /api/nodes
//
func (c *Client) ListNodes() (rec []NodeInfo, err error) {
req, err := newGETRequest(c, "nodes")
if err != nil {
return []NodeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/nodes/{name}
//
// {
// "partitions": [],
// "os_pid": "39292",
// "fd_used": 35,
// "fd_total": 256,
// "sockets_used": 4,
// "sockets_total": 138,
// "mem_used": 69964432,
// "mem_limit": 2960660889,
// "mem_alarm": false,
// "disk_free_limit": 50000000,
// "disk_free": 188362731520,
// "disk_free_alarm": false,
// "proc_used": 370,
// "proc_total": 1048576,
// "statistics_level": "fine",
// "uptime": 98355255,
// "run_queue": 0,
// "processors": 8,
// "exchange_types": [
// {
// "name": "topic",
// "description": "AMQP topic exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "x-consistent-hash",
// "description": "Consistent Hashing Exchange",
// "enabled": true
// },
// {
// "name": "fanout",
// "description": "AMQP fanout exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "direct",
// "description": "AMQP direct exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "headers",
// "description": "AMQP headers exchange, as per the AMQP specification",
// "enabled": true
// }
// ],
// "auth_mechanisms": [
// {
// "name": "AMQPLAIN",
// "description": "QPid AMQPLAIN mechanism",
// "enabled": true
// },
// {
// "name": "PLAIN",
// "description": "SASL PLAIN authentication mechanism",
// "enabled": true
// },
// {
// "name": "RABBIT-CR-DEMO",
// "description": "RabbitMQ Demo challenge-response authentication mechanism",
// "enabled": false
// }
// ],
// "applications": [
// {
// "name": "amqp_client",
// "description": "RabbitMQ AMQP Client",
// "version": "3.2.0"
// },
// {
// "name": "asn1",
// "description": "The Erlang ASN1 compiler version 2.0.3",
// "version": "2.0.3"
// },
// {
// "name": "cowboy",
// "description": "Small, fast, modular HTTP server.",
// "version": "0.5.0-rmq3.2.0-git4b93c2d"
// },
// {
// "name": "crypto",
// "description": "CRYPTO version 2",
// "version": "3.1"
// },
// {
// "name": "inets",
// "description": "INETS CXC 138 49",
// "version": "5.9.6"
// },
// {
// "name": "kernel",
// "description": "ERTS CXC 138 10",
// "version": "2.16.3"
// },
// {
// "name": "mnesia",
// "description": "MNESIA CXC 138 12",
// "version": "4.10"
// },
// {
// "name": "mochiweb",
// "description": "MochiMedia Web Server",
// "version": "2.7.0-rmq3.2.0-git680dba8"
// },
// {
// "name": "os_mon",
// "description": "CPO CXC 138 46",
// "version": "2.2.13"
// },
// {
// "name": "public_key",
// "description": "Public key infrastructure",
// "version": "0.20"
// },
// {
// "name": "rabbit",
// "description": "RabbitMQ",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_consistent_hash_exchange",
// "description": "Consistent Hash Exchange Type",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_management",
// "description": "RabbitMQ Management Console",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_management_agent",
// "description": "RabbitMQ Management Agent",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_mqtt",
// "description": "RabbitMQ MQTT Adapter",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_shovel",
// "description": "Data Shovel for RabbitMQ",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_shovel_management",
// "description": "Shovel Status",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_stomp",
// "description": "Embedded Rabbit Stomp Adapter",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_web_dispatch",
// "description": "RabbitMQ Web Dispatcher",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_web_stomp",
// "description": "Rabbit WEB-STOMP - WebSockets to Stomp adapter",
// "version": "3.2.0"
// },
// {
// "name": "sasl",
// "description": "SASL CXC 138 11",
// "version": "2.3.3"
// },
// {
// "name": "sockjs",
// "description": "SockJS",
// "version": "0.3.4-rmq3.2.0-git3132eb9"
// },
// {
// "name": "ssl",
// "description": "Erlang\/OTP SSL application",
// "version": "5.3.1"
// },
// {
// "name": "stdlib",
// "description": "ERTS CXC 138 10",
// "version": "1.19.3"
// },
// {
// "name": "webmachine",
// "description": "webmachine",
// "version": "1.10.3-rmq3.2.0-gite9359c7"
// },
// {
// "name": "xmerl",
// "description": "XML parser",
// "version": "1.3.4"
// }
// ],
// "contexts": [
// {
// "description": "Redirect to port 15672",
// "path": "\/",
// "port": 55672,
// "ignore_in_use": true
// },
// {
// "description": "RabbitMQ Management",
// "path": "\/",
// "port": 15672
// }
// ],
// "name": "rabbit@mercurio",
// "type": "disc",
// "running": true
// }
func (c *Client) GetNode(name string) (rec *NodeInfo, err error) {
req, err := newGETRequest(c, "nodes/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

View File

@ -0,0 +1,126 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/permissions
//
// Example response:
//
// [{"user":"guest","vhost":"/","configure":".*","write":".*","read":".*"}]
type PermissionInfo struct {
User string `json:"user"`
Vhost string `json:"vhost"`
// Configuration permissions
Configure string `json:"configure"`
// Write permissions
Write string `json:"write"`
// Read permissions
Read string `json:"read"`
}
// Returns permissions for all users and virtual hosts.
func (c *Client) ListPermissions() (rec []PermissionInfo, err error) {
req, err := newGETRequest(c, "permissions/")
if err != nil {
return []PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []PermissionInfo{}, err
}
return rec, nil
}
//
// GET /api/users/{user}/permissions
//
// Returns permissions of a specific user.
func (c *Client) ListPermissionsOf(username string) (rec []PermissionInfo, err error) {
req, err := newGETRequest(c, "users/"+url.QueryEscape(username)+"/permissions")
if err != nil {
return []PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []PermissionInfo{}, err
}
return rec, nil
}
//
// GET /api/permissions/{vhost}/{user}
//
// Returns permissions of user in virtual host.
func (c *Client) GetPermissionsIn(vhost, username string) (rec PermissionInfo, err error) {
req, err := newGETRequest(c, "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username))
if err != nil {
return PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return PermissionInfo{}, err
}
return rec, nil
}
//
// PUT /api/permissions/{vhost}/{user}
//
type Permissions struct {
Configure string `json:"configure"`
Write string `json:"write"`
Read string `json:"read"`
}
// Updates permissions of user in virtual host.
func (c *Client) UpdatePermissionsIn(vhost, username string, permissions Permissions) (res *http.Response, err error) {
body, err := json.Marshal(permissions)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/permissions/{vhost}/{user}
//
// Clears (deletes) permissions of user in virtual host.
func (c *Client) ClearPermissionsIn(vhost, username string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,31 @@
package rabbithole
func (c *Client) EnabledProtocols() (xs []string, err error) {
overview, err := c.Overview()
if err != nil {
return []string{}, err
}
// we really need to implement Map/Filter/etc. MK.
xs = make([]string, len(overview.Listeners))
for i, lnr := range overview.Listeners {
xs[i] = lnr.Protocol
}
return xs, nil
}
func (c *Client) ProtocolPorts() (res map[string]Port, err error) {
res = map[string]Port{}
overview, err := c.Overview()
if err != nil {
return res, err
}
for _, lnr := range overview.Listeners {
res[lnr.Protocol] = lnr.Port
}
return res, nil
}

View File

@ -0,0 +1,127 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Policy definition: additional arguments
// added to the entities (queues, exchanges or both)
// that match a policy.
type PolicyDefinition map[string]interface{}
type NodeNames []string
// Represents a configured policy.
type Policy struct {
// Virtual host this policy is in.
Vhost string `json:"vhost"`
// Regular expression pattern used to match queues and exchanges,
// , e.g. "^ha\..+"
Pattern string `json:"pattern"`
// What this policy applies to: "queues", "exchanges", etc.
ApplyTo string `json:"apply-to"`
Name string `json:"name"`
Priority int `json:"priority"`
// Additional arguments added to the entities (queues,
// exchanges or both) that match a policy
Definition PolicyDefinition `json:"definition"`
}
//
// GET /api/policies
//
// Return all policies (across all virtual hosts).
func (c *Client) ListPolicies() (rec []Policy, err error) {
req, err := newGETRequest(c, "policies")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/policies/{vhost}
//
// Returns policies in a specific virtual host.
func (c *Client) ListPoliciesIn(vhost string) (rec []Policy, err error) {
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/policies/{vhost}/{name}
//
// Returns individual policy in virtual host.
func (c *Client) GetPolicy(vhost, name string) (rec *Policy, err error) {
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/policies/{vhost}/{name}
//
// Updates a policy.
func (c *Client) PutPolicy(vhost string, name string, policy Policy) (res *http.Response, err error) {
body, err := json.Marshal(policy)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/policies/{vhost}/{name}
//
// Deletes a policy.
func (c *Client) DeletePolicy(vhost, name string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,256 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Information about backing queue (queue storage engine).
type BackingQueueStatus struct {
Q1 int `json:"q1"`
Q2 int `json:"q2"`
Q3 int `json:"q3"`
Q4 int `json:"q4"`
// Total queue length
Length int64 `json:"len"`
// Number of pending acks from consumers
PendingAcks int64 `json:"pending_acks"`
// Number of messages held in RAM
RAMMessageCount int64 `json:"ram_msg_count"`
// Number of outstanding acks held in RAM
RAMAckCount int64 `json:"ram_ack_count"`
// Number of persistent messages in the store
PersistentCount int64 `json:"persistent_count"`
// Average ingress (inbound) rate, not including messages
// that straight through to auto-acking consumers.
AverageIngressRate float64 `json:"avg_ingress_rate"`
// Average egress (outbound) rate, not including messages
// that straight through to auto-acking consumers.
AverageEgressRate float64 `json:"avg_egress_rate"`
// rate at which unacknowledged message records enter RAM,
// e.g. because messages are delivered requiring acknowledgement
AverageAckIngressRate float32 `json:"avg_ack_ingress_rate"`
// rate at which unacknowledged message records leave RAM,
// e.g. because acks arrive or unacked messages are paged out
AverageAckEgressRate float32 `json:"avg_ack_egress_rate"`
}
type OwnerPidDetails struct {
Name string `json:"name"`
PeerPort Port `json:"peer_port"`
PeerHost string `json:"peer_host"`
}
type QueueInfo struct {
// Queue name
Name string `json:"name"`
// Virtual host this queue belongs to
Vhost string `json:"vhost"`
// Is this queue durable?
Durable bool `json:"durable"`
// Is this queue auto-delted?
AutoDelete bool `json:"auto_delete"`
// Extra queue arguments
Arguments map[string]interface{} `json:"arguments"`
// RabbitMQ node that hosts master for this queue
Node string `json:"node"`
// Queue status
Status string `json:"status"`
// Total amount of RAM used by this queue
Memory int64 `json:"memory"`
// How many consumers this queue has
Consumers int `json:"consumers"`
// If there is an exclusive consumer, its consumer tag
ExclusiveConsumerTag string `json:"exclusive_consumer_tag"`
// Policy applied to this queue, if any
Policy string `json:"policy"`
// Total number of messages in this queue
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
// Number of messages ready to be delivered
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
// Number of messages delivered and pending acknowledgements from consumers
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
MessageStats MessageStats `json:"message_stats"`
OwnerPidDetails OwnerPidDetails `json:"owner_pid_details"`
BackingQueueStatus BackingQueueStatus `json:"backing_queue_status"`
}
type DetailedQueueInfo QueueInfo
//
// GET /api/queues
//
// [
// {
// "owner_pid_details": {
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672",
// "peer_port": 46928,
// "peer_host": "127.0.0.1"
// },
// "message_stats": {
// "publish": 19830,
// "publish_details": {
// "rate": 5
// }
// },
// "messages": 15,
// "messages_details": {
// "rate": 0
// },
// "messages_ready": 15,
// "messages_ready_details": {
// "rate": 0
// },
// "messages_unacknowledged": 0,
// "messages_unacknowledged_details": {
// "rate": 0
// },
// "policy": "",
// "exclusive_consumer_tag": "",
// "consumers": 0,
// "memory": 143112,
// "backing_queue_status": {
// "q1": 0,
// "q2": 0,
// "delta": [
// "delta",
// "undefined",
// 0,
// "undefined"
// ],
// "q3": 0,
// "q4": 15,
// "len": 15,
// "pending_acks": 0,
// "target_ram_count": "infinity",
// "ram_msg_count": 15,
// "ram_ack_count": 0,
// "next_seq_id": 19830,
// "persistent_count": 0,
// "avg_ingress_rate": 4.9920127795527,
// "avg_egress_rate": 4.9920127795527,
// "avg_ack_ingress_rate": 0,
// "avg_ack_egress_rate": 0
// },
// "status": "running",
// "name": "amq.gen-QLEaT5Rn_ogbN3O8ZOQt3Q",
// "vhost": "rabbit\/hole",
// "durable": false,
// "auto_delete": false,
// "arguments": {
// "x-message-ttl": 5000
// },
// "node": "rabbit@marzo"
// }
// ]
func (c *Client) ListQueues() (rec []QueueInfo, err error) {
req, err := newGETRequest(c, "queues")
if err != nil {
return []QueueInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []QueueInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}
//
func (c *Client) ListQueuesIn(vhost string) (rec []QueueInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost))
if err != nil {
return []QueueInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []QueueInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}/{name}
//
func (c *Client) GetQueue(vhost, queue string) (rec *DetailedQueueInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+queue)
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/exchanges/{vhost}/{exchange}
//
type QueueSettings struct {
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Arguments map[string]interface{} `json:"arguments"`
}
func (c *Client) DeclareQueue(vhost, queue string, info QueueSettings) (res *http.Response, err error) {
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/queues/{vhost}/{name}
//
func (c *Client) DeleteQueue(vhost, queue string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,109 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
type UserInfo struct {
Name string `json:"name"`
PasswordHash string `json:"password_hash"`
// Tags control permissions. Built-in tags: administrator, management, policymaker.
Tags string `json:"tags"`
}
// Settings used to create users. Tags must be comma-separated.
type UserSettings struct {
Name string `json:"name"`
// Tags control permissions. Administrator grants full
// permissions, management grants management UI and HTTP API
// access, policymaker grants policy management permissions.
Tags string `json:"tags"`
// *never* returned by RabbitMQ. Set by the client
// to create/update a user. MK.
Password string `json:"password"`
}
//
// GET /api/users
//
// Example response:
// [{"name":"guest","password_hash":"8LYTIFbVUwi8HuV/dGlp2BYsD1I=","tags":"administrator"}]
// Returns a list of all users in a cluster.
func (c *Client) ListUsers() (rec []UserInfo, err error) {
req, err := newGETRequest(c, "users/")
if err != nil {
return []UserInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []UserInfo{}, err
}
return rec, nil
}
//
// GET /api/users/{name}
//
// Returns information about individual user.
func (c *Client) GetUser(username string) (rec *UserInfo, err error) {
req, err := newGETRequest(c, "users/"+url.QueryEscape(username))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/users/{name}
//
// Updates information about individual user.
func (c *Client) PutUser(username string, info UserSettings) (res *http.Response, err error) {
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "users/"+url.QueryEscape(username), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/users/{name}
//
// Deletes user.
func (c *Client) DeleteUser(username string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "users/"+url.QueryEscape(username), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View File

@ -0,0 +1,160 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/vhosts
//
// Example response:
// [
// {
// "message_stats": {
// "publish": 78,
// "publish_details": {
// "rate": 0
// }
// },
// "messages": 0,
// "messages_details": {
// "rate": 0
// },
// "messages_ready": 0,
// "messages_ready_details": {
// "rate": 0
// },
// "messages_unacknowledged": 0,
// "messages_unacknowledged_details": {
// "rate": 0
// },
// "recv_oct": 16653,
// "recv_oct_details": {
// "rate": 0
// },
// "send_oct": 40495,
// "send_oct_details": {
// "rate": 0
// },
// "name": "\/",
// "tracing": false
// },
// {
// "name": "29dd51888b834698a8b5bc3e7f8623aa1c9671f5",
// "tracing": false
// }
// ]
type VhostInfo struct {
// Virtual host name
Name string `json:"name"`
// True if tracing is enabled for this virtual host
Tracing bool `json:"tracing"`
// Total number of messages in queues of this virtual host
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
// Total number of messages ready to be delivered in queues of this virtual host
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
// Total number of messages pending acknowledgement from consumers in this virtual host
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
// Octets received
RecvOct uint64 `json:"recv_oct"`
// Octets sent
SendOct uint64 `json:"send_oct"`
RecvCount uint64 `json:"recv_cnt"`
SendCount uint64 `json:"send_cnt"`
SendPending uint64 `json:"send_pend"`
RecvOctDetails RateDetails `json:"recv_oct_details"`
SendOctDetails RateDetails `json:"send_oct_details"`
}
// Returns a list of virtual hosts.
func (c *Client) ListVhosts() (rec []VhostInfo, err error) {
req, err := newGETRequest(c, "vhosts/")
if err != nil {
return []VhostInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []VhostInfo{}, err
}
return rec, nil
}
//
// GET /api/vhosts/{name}
//
// Returns information about a specific virtual host.
func (c *Client) GetVhost(vhostname string) (rec *VhostInfo, err error) {
req, err := newGETRequest(c, "vhosts/"+url.QueryEscape(vhostname))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/vhosts/{name}
//
// Settings used to create or modify virtual hosts.
type VhostSettings struct {
// True if tracing should be enabled.
Tracing bool `json:"tracing"`
}
// Creates or updates a virtual host.
func (c *Client) PutVhost(vhostname string, settings VhostSettings) (res *http.Response, err error) {
body, err := json.Marshal(settings)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "vhosts/"+url.QueryEscape(vhostname), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/vhosts/{name}
//
// Deletes a virtual host.
func (c *Client) DeleteVhost(vhostname string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "vhosts/"+url.QueryEscape(vhostname), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}