Add RabbitMQ secret backend

This commit is contained in:
Kevin Pike 2015-11-18 08:25:42 -08:00 committed by Kevin Pike
parent 0b3c7b177a
commit c755065415
31 changed files with 3798 additions and 0 deletions

View file

@ -0,0 +1,24 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
src/github.com
src/code.google.com

View file

@ -0,0 +1,17 @@
language: go
go:
- 1.3
- 1.4
- tip
services:
- rabbitmq
before_script: ./bin/ci/before_build.sh
script: make
matrix:
allow_failures:
- go: tip
notifications:
email:
- michael@rabbitmq.com

View file

@ -0,0 +1,15 @@
## Will be in 1.0 (First Public Release)
### TLS Support
`rabbithole.NewTLSClient` is a new function which works
much like `NewClient` but additionally accepts a transport.
Contributed by @GrimTheReaper.
### Federation Support
It is now possible to create federation links
over HTTP API.
Contributed by Ryan Grenz.

View file

@ -0,0 +1,23 @@
Copyright (c) 2013, Michael Klishin
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -0,0 +1,24 @@
export GOPATH := $(CURDIR)
COVER_FILE := coverage
all: test
.PHONY: test
test: install-dependencies
go test -v
cover: install-dependencies install-cover
go test -v -test.coverprofile="$(COVER_FILE).prof"
sed -i.bak 's|_'$(GOPATH)'|.|g' $(COVER_FILE).prof
go tool cover -html=$(COVER_FILE).prof -o $(COVER_FILE).html
rm $(COVER_FILE).prof*
install-cover:
go get code.google.com/p/go.tools/cmd/cover
install-dependencies:
go get github.com/onsi/ginkgo
go get github.com/onsi/gomega
go get github.com/streadway/amqp

View file

@ -0,0 +1,293 @@
# Rabbit Hole, a RabbitMQ HTTP API Client for Go
This library is a [RabbitMQ HTTP API](http://hg.rabbitmq.com/rabbitmq-management/raw-file/450b7ea22cfd/priv/www/api/index.html) client for the Go language.
## Supported Go Versions
Rabbit Hole requires Go 1.1+.
## Supported RabbitMQ Versions
* RabbitMQ 3.x
* RabbitMQ 2.x
All versions require [RabbitMQ Management UI plugin](http://www.rabbitmq.com/management.html) to be installed and enabled.
## Project Maturity
Rabbit Hole is a young project designed after a couple of other RabbitMQ HTTP API clients with stable APIs.
However, breaking API changes are not out of the question.
It is largely (80-90%) feature complete and decently documented.
## Installation
```
go get github.com/michaelklishin/rabbit-hole
```
## Documentation
### Overview
To import the package:
``` go
import (
"github.com/michaelklishin/rabbit-hole"
)
```
All HTTP API operations are accessible via `rabbithole.Client`, which
should be instantiated with `rabbithole.NewClient`:
``` go
// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
```
SSL/TSL is now available, by adding a Transport Layer to the parameters
of `rabbithole.NewTLSClient`:
``` go
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, _ := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
```
However, RabbitMQ-Management does not have SSL/TLS enabled by default,
so you must enable it.
[API reference](http://godoc.org/github.com/michaelklishin/rabbit-hole) is available on [godoc.org](http://godoc.org).
### Getting Overview
``` go
res, err := rmqc.Overview()
```
### Node and Cluster Status
``` go
xs, err := rmqc.ListNodes()
// => []NodeInfo, err
node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
```
### Operations on Connections
``` go
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err
// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
```
### Operations on Channels
``` go
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
```
### Operations on Vhosts
``` go
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err
// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err
// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err
// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
```
### Managing Users
``` go
xs, err := rmqc.ListUsers()
// => []UserInfo, err
// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err
// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management,policymaker"})
// => *http.Response, err
// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
```
### Managing Permissions
``` go
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err
// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err
// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err
// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err
// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
```
### Operations on Exchanges
``` go
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err
// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err
// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err
// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err
// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
```
### Operations on Queues
``` go
qs, err := rmqc.ListQueues()
// => []QueueInfo, err
// list queues in a vhost
qs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err
// information about individual queue
q, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err
// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err
// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err
```
### Operations on Bindings
``` go
bs, err := rmqc.ListBindings()
// => []BindingInfo, err
// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err
// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err
// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
})
// => *http.Response, err
// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
PropertiesKey: "%23",
})
// => *http.Response, err
```
### HTTPS Connections
``` go
var tlsConfig *tls.Config
...
transport := &http.Transport{TLSClientConfig: tlsConfig}
rmqc, err := NewTLSClient("https://127.0.0.1:15672", "guest", "guest", transport)
```
### Changing Transport Layer
``` go
var transport *http.Transport
...
rmqc.SetTransport(transport)
```
## Contributing
1. Fork it
2. Create your feature branch (`git checkout -b my-new-feature`)
3. Commit your changes (`git commit -am 'Add some feature'`)
4. Push to the branch (`git push -u origin my-new-feature`)
5. Create new Pull Request
## License & Copyright
2-clause BSD license.
(c) Michael S. Klishin, 2013-2014.
[![Bitdeli Badge](https://d2weczhvl823v0.cloudfront.net/michaelklishin/rabbit-hole/trend.png)](https://bitdeli.com/free "Bitdeli Badge")

View file

@ -0,0 +1,157 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/bindings
//
// Example response:
//
// [
// {
// "source": "",
// "vhost": "\/",
// "destination": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
// "destination_type": "queue",
// "routing_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w",
// "arguments": {
//
// },
// "properties_key": "amq.gen-Dzw36tPTm_VsmILY9oTG9w"
// }
// ]
type BindingInfo struct {
// Binding source (exchange name)
Source string `json:"source"`
Vhost string `json:"vhost"`
// Binding destination (queue or exchange name)
Destination string `json:"destination"`
// Destination type, either "queue" or "exchange"
DestinationType string `json:"destination_type"`
RoutingKey string `json:"routing_key"`
Arguments map[string]interface{} `json:"arguments"`
PropertiesKey string `json:"properties_key"`
}
// Returns all bindings
func (c *Client) ListBindings() (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "bindings/")
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// GET /api/bindings/{vhost}
//
// Returns all bindings in a virtual host.
func (c *Client) ListBindingsIn(vhost string) (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "bindings/"+url.QueryEscape(vhost))
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}/{queue}/bindings
//
// Example response:
// [
// {"source":"",
// "vhost":"/",
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "destination_type":"queue",
// "routing_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "arguments":{},
// "properties_key":"amq.gen-H0tnavWatL7g7uU2q5cAPA"},
// {"source":"temp",
// "vhost":"/",
// "destination":"amq.gen-H0tnavWatL7g7uU2q5cAPA",
// "destination_type":"queue",
// "routing_key":"",
// "arguments":{},
// "properties_key":"~"}
// ]
// Returns all bindings of individual queue.
func (c *Client) ListQueueBindings(vhost, queue string) (rec []BindingInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue)+"/bindings")
if err != nil {
return []BindingInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []BindingInfo{}, err
}
return rec, nil
}
//
// POST /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}
//
// DeclareBinding updates information about a binding between a source and a target
func (c *Client) DeclareBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
info.Vhost = vhost
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "POST", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/bindings/{vhost}/e/{source}/{destination_type}/{destination}/{props}
//
// DeleteBinding delets an individual binding
func (c *Client) DeleteBinding(vhost string, info BindingInfo) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "bindings/"+url.QueryEscape(vhost)+"/e/"+url.QueryEscape(info.Source)+"/"+url.QueryEscape(string(info.DestinationType[0]))+"/"+url.QueryEscape(info.Destination)+"/"+url.QueryEscape(info.PropertiesKey), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,86 @@
package rabbithole
import "net/url"
// Brief (very incomplete) connection information.
type BriefConnectionDetails struct {
// Connection name
Name string `json:"name"`
// Client port
PeerPort Port `json:"peer_port"`
// Client host
PeerHost string `json:"peer_host"`
}
type ChannelInfo struct {
// Channel number
Number int `json:"number"`
// Channel name
Name string `json:"name"`
// basic.qos (prefetch count) value used
PrefetchCount int `json:"prefetch_count"`
// How many consumers does this channel have
ConsumerCount int `json:"consumer_count"`
// Number of unacknowledged messages on this channel
UnacknowledgedMessageCount int `json:"messages_unacknowledged"`
// Number of messages on this channel unconfirmed to publishers
UnconfirmedMessageCount int `json:"messages_unconfirmed"`
// Number of messages on this channel uncommited to message store
UncommittedMessageCount int `json:"messages_uncommitted"`
// Number of acks on this channel uncommited to message store
UncommittedAckCount int `json:"acks_uncommitted"`
// TODO(mk): custom deserializer to date/time?
IdleSince string `json:"idle_since"`
// True if this channel uses publisher confirms
UsesPublisherConfirms bool `json:"confirm"`
// True if this channel uses transactions
Transactional bool `json:"transactional"`
// True if this channel is blocked via channel.flow
ClientFlowBlocked bool `json:"client_flow_blocked"`
User string `json:"user"`
Vhost string `json:"vhost"`
Node string `json:"node"`
ConnectionDetails BriefConnectionDetails `json:"connection_details"`
}
//
// GET /api/channels
//
// Returns information about all open channels.
func (c *Client) ListChannels() (rec []ChannelInfo, err error) {
req, err := newGETRequest(c, "channels")
if err != nil {
return []ChannelInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ChannelInfo{}, err
}
return rec, nil
}
//
// GET /api/channels/{name}
//
// Returns channel information.
func (c *Client) GetChannel(name string) (rec *ChannelInfo, err error) {
req, err := newGETRequest(c, "channels/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

View file

@ -0,0 +1,128 @@
package rabbithole
import (
"bytes"
"encoding/json"
"errors"
"net/http"
"net/url"
)
type Client struct {
// URI of a RabbitMQ node to use, not including the path, e.g. http://127.0.0.1:15672.
Endpoint string
// Username to use. This RabbitMQ user must have the "management" tag.
Username string
// Password to use.
Password string
host string
transport *http.Transport
}
func NewClient(uri string, username string, password string) (me *Client, err error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
me = &Client{
Endpoint: uri,
host: u.Host,
Username: username,
Password: password,
}
return me, nil
}
//NewTLSClient Creates a Client with a Transport Layer; it is up to the developer to make that layer Secure.
func NewTLSClient(uri string, username string, password string, transport *http.Transport) (me *Client, err error) {
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
me = &Client{
Endpoint: uri,
host: u.Host,
Username: username,
Password: password,
transport: transport,
}
return me, nil
}
//SetTransport changes the Transport Layer that the Client will use.
func (c *Client) SetTransport(transport *http.Transport) {
c.transport = transport
}
func newGETRequest(client *Client, path string) (*http.Request, error) {
s := client.Endpoint + "/api/" + path
req, err := http.NewRequest("GET", s, nil)
req.SetBasicAuth(client.Username, client.Password)
// set Opaque to preserve percent-encoded path. MK.
req.URL.Opaque = "//" + client.host + "/api/" + path
return req, err
}
func newRequestWithBody(client *Client, method string, path string, body []byte) (*http.Request, error) {
s := client.Endpoint + "/api/" + path
req, err := http.NewRequest(method, s, bytes.NewReader(body))
req.SetBasicAuth(client.Username, client.Password)
// set Opaque to preserve percent-encoded path. MK.
req.URL.Opaque = "//" + client.host + "/api/" + path
req.Header.Add("Content-Type", "application/json")
return req, err
}
func executeRequest(client *Client, req *http.Request) (res *http.Response, err error) {
var httpc *http.Client
if client.transport != nil {
httpc = &http.Client{Transport: client.transport}
} else {
httpc = &http.Client{}
}
res, err = httpc.Do(req)
if err != nil {
return nil, err
}
return res, nil
}
func executeAndParseRequest(client *Client, req *http.Request, rec interface{}) (err error) {
var httpc *http.Client
if client.transport != nil {
httpc = &http.Client{Transport: client.transport}
} else {
httpc = &http.Client{}
}
res, err := httpc.Do(req)
if err != nil {
return err
}
if isNotFound(res) {
return errors.New("not found")
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(&rec)
if err != nil {
return err
}
return nil
}
func isNotFound(res *http.Response) bool {
return res.StatusCode == http.StatusNotFound
}

View file

@ -0,0 +1,45 @@
package rabbithole
import "strconv"
// Extra arguments as a map (on queues, bindings, etc)
type Properties map[string]interface{}
// Port used by RabbitMQ or clients
type Port int
func (p *Port) UnmarshalJSON(b []byte) error {
stringValue := string(b)
var parsed int64
var err error
if stringValue[0] == '"' && stringValue[len(stringValue)-1] == '"' {
parsed, err = strconv.ParseInt(stringValue[1:len(stringValue)-1], 10, 32)
} else {
parsed, err = strconv.ParseInt(stringValue, 10, 32)
}
if err == nil {
*p = Port(int(parsed))
}
return err
}
// Rate of change of a numerical value
type RateDetails struct {
Rate float32 `json:"rate"`
}
// RabbitMQ context (Erlang app) running on
// a node
type BrokerContext struct {
Node string `json:"node"`
Description string `json:"description"`
Path string `json:"path"`
Port Port `json:"port"`
Ignore bool `json:"ignore_in_use"`
}
// Basic published messages statistics
type MessageStats struct {
Publish int `json:"publish"`
PublishDetails RateDetails `json:"publish_details"`
}

View file

@ -0,0 +1,131 @@
package rabbithole
import (
"net/http"
"net/url"
)
// Provides information about connection to a RabbitMQ node.
type ConnectionInfo struct {
// Connection name
Name string `json:"name"`
// Node the client is connected to
Node string `json:"node"`
// Number of open channels
Channels int `json:"channels"`
// Connection state
State string `json:"state"`
// Connection type, network (via AMQP client) or direct (via direct Erlang client)
Type string `json:"type"`
// Server port
Port Port `json:"port"`
// Client port
PeerPort Port `json:"peer_port"`
// Server host
Host string `json:"host"`
// Client host
PeerHost string `json:"peer_host"`
// Last connection blocking reason, if any
LastBlockedBy string `json:"last_blocked_by"`
// When connection was last blocked
LastBlockedAge string `json:"last_blocked_age"`
// True if connection uses TLS/SSL
UsesTLS bool `json:"ssl"`
// Client certificate subject
PeerCertSubject string `json:"peer_cert_subject"`
// Client certificate validity
PeerCertValidity string `json:"peer_cert_validity"`
// Client certificate issuer
PeerCertIssuer string `json:"peer_cert_issuer"`
// TLS/SSL protocol and version
SSLProtocol string `json:"ssl_protocol"`
// Key exchange mechanism
SSLKeyExchange string `json:"ssl_key_exchange"`
// SSL cipher suite used
SSLCipher string `json:"ssl_cipher"`
// SSL hash
SSLHash string `json:"ssl_hash"`
// Protocol, e.g. AMQP 0-9-1 or MQTT 3-1
Protocol string `json:"protocol"`
User string `json:"user"`
// Virtual host
Vhost string `json:"vhost"`
// Heartbeat timeout
Timeout int `json:"timeout"`
// Maximum frame size (AMQP 0-9-1)
FrameMax int `json:"frame_max"`
// A map of client properties (name, version, capabilities, etc)
ClientProperties Properties `json:"client_properties"`
// Octets received
RecvOct uint64 `json:"recv_oct"`
// Octets sent
SendOct uint64 `json:"send_oct"`
RecvCount uint64 `json:"recv_cnt"`
SendCount uint64 `json:"send_cnt"`
SendPending uint64 `json:"send_pend"`
// Ingress data rate
RecvOctDetails RateDetails `json:"recv_oct_details"`
// Egress data rate
SendOctDetails RateDetails `json:"send_oct_details"`
}
//
// GET /api/connections
//
func (c *Client) ListConnections() (rec []ConnectionInfo, err error) {
req, err := newGETRequest(c, "connections")
if err != nil {
return []ConnectionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ConnectionInfo{}, err
}
return rec, nil
}
//
// GET /api/connections/{name}
//
func (c *Client) GetConnection(name string) (rec *ConnectionInfo, err error) {
req, err := newGETRequest(c, "connections/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// DELETE /api/connections/{name}
//
func (c *Client) CloseConnection(name string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "connections/"+url.QueryEscape(name), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,173 @@
/*
Rabbit Hole is a Go client for the RabbitMQ HTTP API.
All HTTP API operations are accessible via `rabbithole.Client`, which
should be instantiated with `rabbithole.NewClient`.
// URI, username, password
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
Getting Overview
res, err := rmqc.Overview()
Node and Cluster Status
var err error
// => []NodeInfo, err
xs, err := rmqc.ListNodes()
node, err := rmqc.GetNode("rabbit@mercurio")
// => NodeInfo, err
Operations on Connections
xs, err := rmqc.ListConnections()
// => []ConnectionInfo, err
conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => ConnectionInfo, err
// Forcefully close connection
_, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672")
// => *http.Response, err
Operations on Channels
xs, err := rmqc.ListChannels()
// => []ChannelInfo, err
ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)")
// => ChannelInfo, err
Operations on Exchanges
xs, err := rmqc.ListExchanges()
// => []ExchangeInfo, err
// list exchanges in a vhost
xs, err := rmqc.ListExchangesIn("/")
// => []ExchangeInfo, err
// information about individual exchange
x, err := rmqc.GetExchange("/", "amq.fanout")
// => ExchangeInfo, err
// declares an exchange
resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false})
// => *http.Response, err
// deletes individual exchange
resp, err := rmqc.DeleteExchange("/", "an.exchange")
// => *http.Response, err
Operations on Queues
xs, err := rmqc.ListQueues()
// => []QueueInfo, err
// list queues in a vhost
xs, err := rmqc.ListQueuesIn("/")
// => []QueueInfo, err
// information about individual queue
x, err := rmqc.GetQueue("/", "a.queue")
// => QueueInfo, err
// declares a queue
resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false})
// => *http.Response, err
// deletes individual queue
resp, err := rmqc.DeleteQueue("/", "a.queue")
// => *http.Response, err
Operations on Bindings
bs, err := rmqc.ListBindings()
// => []BindingInfo, err
// list bindings in a vhost
bs, err := rmqc.ListBindingsIn("/")
// => []BindingInfo, err
// list bindings of a queue
bs, err := rmqc.ListQueueBindings("/", "a.queue")
// => []BindingInfo, err
// declare a binding
resp, err := rmqc.DeclareBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
})
// => *http.Response, err
// deletes individual binding
resp, err := rmqc.DeleteBinding("/", BindingInfo{
Source: "an.exchange",
Destination: "a.queue",
DestinationType: "queue",
RoutingKey: "#",
PropertiesKey: "%23",
})
// => *http.Response, err
Operations on Vhosts
xs, err := rmqc.ListVhosts()
// => []VhostInfo, err
// information about individual vhost
x, err := rmqc.GetVhost("/")
// => VhostInfo, err
// creates or updates individual vhost
resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false})
// => *http.Response, err
// deletes individual vhost
resp, err := rmqc.DeleteVhost("/")
// => *http.Response, err
Managing Users
xs, err := rmqc.ListUsers()
// => []UserInfo, err
// information about individual user
x, err := rmqc.GetUser("my.user")
// => UserInfo, err
// creates or updates individual user
resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management policymaker"})
// => *http.Response, err
// deletes individual user
resp, err := rmqc.DeleteUser("my.user")
// => *http.Response, err
Managing Permissions
xs, err := rmqc.ListPermissions()
// => []PermissionInfo, err
// permissions of individual user
x, err := rmqc.ListPermissionsOf("my.user")
// => []PermissionInfo, err
// permissions of individual user in vhost
x, err := rmqc.GetPermissionsIn("/", "my.user")
// => PermissionInfo, err
// updates permissions of user in vhost
resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"})
// => *http.Response, err
// revokes permissions in vhost
resp, err := rmqc.ClearPermissionsIn("/", "my.user")
// => *http.Response, err
*/
package rabbithole

View file

@ -0,0 +1,219 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/exchanges
//
type IngressEgressStats struct {
PublishIn int `json:"publish_in"`
PublishInDetails RateDetails `json:"publish_in_details"`
PublishOut int `json:"publish_out"`
PublishOutDetails RateDetails `json:"publish_out_details"`
}
type ExchangeInfo struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments"`
MessageStats IngressEgressStats `json:"message_stats"`
}
type ExchangeSettings struct {
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Arguments map[string]interface{} `json:"arguments"`
}
func (c *Client) ListExchanges() (rec []ExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges")
if err != nil {
return []ExchangeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ExchangeInfo{}, err
}
return rec, nil
}
//
// GET /api/exchanges/{vhost}
//
func (c *Client) ListExchangesIn(vhost string) (rec []ExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost))
if err != nil {
return []ExchangeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []ExchangeInfo{}, err
}
return rec, nil
}
//
// GET /api/exchanges/{vhost}/{name}
//
// Example response:
//
// {
// "incoming": [
// {
// "stats": {
// "publish": 2760,
// "publish_details": {
// "rate": 20
// }
// },
// "channel_details": {
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672 (2)",
// "number": 2,
// "connection_name": "127.0.0.1:46928 -> 127.0.0.1:5672",
// "peer_port": 46928,
// "peer_host": "127.0.0.1"
// }
// }
// ],
// "outgoing": [
// {
// "stats": {
// "publish": 1280,
// "publish_details": {
// "rate": 20
// }
// },
// "queue": {
// "name": "amq.gen-7NhO_yRr4lDdp-8hdnvfuw",
// "vhost": "rabbit\/hole"
// }
// }
// ],
// "message_stats": {
// "publish_in": 2760,
// "publish_in_details": {
// "rate": 20
// },
// "publish_out": 1280,
// "publish_out_details": {
// "rate": 20
// }
// },
// "name": "amq.fanout",
// "vhost": "rabbit\/hole",
// "type": "fanout",
// "durable": true,
// "auto_delete": false,
// "internal": false,
// "arguments": {
// }
// }
type ExchangeIngressDetails struct {
Stats MessageStats `json:"stats"`
ChannelDetails PublishingChannel `json:"channel_details"`
}
type PublishingChannel struct {
Number int `json:"number"`
Name string `json:"name"`
ConnectionName string `json:"connection_name"`
PeerPort Port `json:"peer_port"`
PeerHost string `json:"peer_host"`
}
type NameAndVhost struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
}
type ExchangeEgressDetails struct {
Stats MessageStats `json:"stats"`
Queue NameAndVhost `json:"queue"`
}
type DetailedExchangeInfo struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments"`
Incoming []ExchangeIngressDetails `json:"incoming"`
Outgoing []ExchangeEgressDetails `json:"outgoing"`
}
func (c *Client) GetExchange(vhost, exchange string) (rec *DetailedExchangeInfo, err error) {
req, err := newGETRequest(c, "exchanges/"+url.QueryEscape(vhost)+"/"+exchange)
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/exchanges/{vhost}/{exchange}
//
func (c *Client) DeclareExchange(vhost, exchange string, info ExchangeSettings) (res *http.Response, err error) {
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/exchanges/{vhost}/{name}
//
func (c *Client) DeleteExchange(vhost, exchange string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "exchanges/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(exchange), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,72 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Federation definition: additional arguments
// added to the entities (queues, exchanges or both)
// that match a policy.
type FederationDefinition struct {
Uri string `json:"uri"`
Expires int `json:"expires"`
MessageTTL int32 `json:"message-ttl"`
MaxHops int `json:"max-hops"`
PrefetchCount int `json:"prefetch-count"`
ReconnectDelay int `json:"reconnect-delay"`
AckMode string `json:"ack-mode"`
TrustUserId bool `json:"trust-user-id"`
}
// Represents a configured Federation upstream.
type FederationUpstream struct {
Definition FederationDefinition `json:"value"`
}
//
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
//
// Updates a federation upstream
func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) {
fedUp := FederationUpstream{
Definition: fDef,
}
body, err := json.Marshal(fedUp)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/parameters/federation-upstream/{vhost}/{name}
//
// Deletes a federation upstream.
func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(upstreamName), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,83 @@
package rabbithole
//
// GET /api/overview
//
type QueueTotals struct {
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
}
type ObjectTotals struct {
Consumers int `json:"consumers"`
Queues int `json:"queues"`
Exchanges int `json:"exchanges"`
Connections int `json:"connections"`
Channels int `json:"channels"`
}
type Listener struct {
Node string `json:"node"`
Protocol string `json:"protocol"`
IpAddress string `json:"ip_address"`
Port Port `json:"port"`
}
type Overview struct {
ManagementVersion string `json:"management_version"`
StatisticsLevel string `json:"statistics_level"`
RabbitMQVersion string `json:"rabbitmq_version"`
ErlangVersion string `json:"erlang_version"`
FullErlangVersion string `json:"erlang_full_version"`
ExchangeTypes []ExchangeType `json:"exchange_types"`
MessageStats MessageStats `json:"message_stats"`
QueueTotals QueueTotals `json:"queue_totals"`
ObjectTotals ObjectTotals `json:"object_totals"`
Node string `json:"node"`
StatisticsDBNode string `json:"statistics_db_node"`
Listeners []Listener `json:"listeners"`
Contexts []BrokerContext `json:"contexts"`
}
func (c *Client) Overview() (rec *Overview, err error) {
req, err := newGETRequest(c, "overview")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/whoami
//
type WhoamiInfo struct {
Name string `json:"name"`
Tags string `json:"tags"`
AuthBackend string `json:"auth_backend"`
}
func (c *Client) Whoami() (rec *WhoamiInfo, err error) {
req, err := newGETRequest(c, "whoami")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

View file

@ -0,0 +1,301 @@
package rabbithole
import (
"net/url"
)
// TODO: this probably should be fixed in RabbitMQ management plugin
type OsPid string
type NameDescriptionEnabled struct {
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
}
type AuthMechanism NameDescriptionEnabled
type ExchangeType NameDescriptionEnabled
type NameDescriptionVersion struct {
Name string `json:"name"`
Description string `json:"description"`
Version string `json:"version"`
}
type ErlangApp NameDescriptionVersion
type NodeInfo struct {
Name string `json:"name"`
NodeType string `json:"type"`
IsRunning bool `json:"running"`
OsPid OsPid `json:"os_pid"`
FdUsed int `json:"fd_used"`
FdTotal int `json:"fd_total"`
SocketsUsed int `json:"sockets_used"`
SocketsTotal int `json:"sockets_total"`
MemUsed int `json:"mem_used"`
MemLimit int `json:"mem_limit"`
MemAlarm bool `json:"mem_alarm"`
DiskFree int `json:"disk_free"`
DiskFreeLimit int `json:"disk_free_limit"`
DiskFreeAlarm bool `json:"disk_free_alarm"`
// Erlang scheduler run queue length
RunQueueLength uint32 `json:"run_queue"`
Processors uint32 `json:"processors"`
Uptime uint64 `json:"uptime"`
ExchangeTypes []ExchangeType `json:"exchange_types"`
AuthMechanisms []AuthMechanism `json:"auth_mechanisms"`
ErlangApps []ErlangApp `json:"applications"`
Contexts []BrokerContext `json:"contexts"`
}
//
// GET /api/nodes
//
func (c *Client) ListNodes() (rec []NodeInfo, err error) {
req, err := newGETRequest(c, "nodes")
if err != nil {
return []NodeInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/nodes/{name}
//
// {
// "partitions": [],
// "os_pid": "39292",
// "fd_used": 35,
// "fd_total": 256,
// "sockets_used": 4,
// "sockets_total": 138,
// "mem_used": 69964432,
// "mem_limit": 2960660889,
// "mem_alarm": false,
// "disk_free_limit": 50000000,
// "disk_free": 188362731520,
// "disk_free_alarm": false,
// "proc_used": 370,
// "proc_total": 1048576,
// "statistics_level": "fine",
// "uptime": 98355255,
// "run_queue": 0,
// "processors": 8,
// "exchange_types": [
// {
// "name": "topic",
// "description": "AMQP topic exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "x-consistent-hash",
// "description": "Consistent Hashing Exchange",
// "enabled": true
// },
// {
// "name": "fanout",
// "description": "AMQP fanout exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "direct",
// "description": "AMQP direct exchange, as per the AMQP specification",
// "enabled": true
// },
// {
// "name": "headers",
// "description": "AMQP headers exchange, as per the AMQP specification",
// "enabled": true
// }
// ],
// "auth_mechanisms": [
// {
// "name": "AMQPLAIN",
// "description": "QPid AMQPLAIN mechanism",
// "enabled": true
// },
// {
// "name": "PLAIN",
// "description": "SASL PLAIN authentication mechanism",
// "enabled": true
// },
// {
// "name": "RABBIT-CR-DEMO",
// "description": "RabbitMQ Demo challenge-response authentication mechanism",
// "enabled": false
// }
// ],
// "applications": [
// {
// "name": "amqp_client",
// "description": "RabbitMQ AMQP Client",
// "version": "3.2.0"
// },
// {
// "name": "asn1",
// "description": "The Erlang ASN1 compiler version 2.0.3",
// "version": "2.0.3"
// },
// {
// "name": "cowboy",
// "description": "Small, fast, modular HTTP server.",
// "version": "0.5.0-rmq3.2.0-git4b93c2d"
// },
// {
// "name": "crypto",
// "description": "CRYPTO version 2",
// "version": "3.1"
// },
// {
// "name": "inets",
// "description": "INETS CXC 138 49",
// "version": "5.9.6"
// },
// {
// "name": "kernel",
// "description": "ERTS CXC 138 10",
// "version": "2.16.3"
// },
// {
// "name": "mnesia",
// "description": "MNESIA CXC 138 12",
// "version": "4.10"
// },
// {
// "name": "mochiweb",
// "description": "MochiMedia Web Server",
// "version": "2.7.0-rmq3.2.0-git680dba8"
// },
// {
// "name": "os_mon",
// "description": "CPO CXC 138 46",
// "version": "2.2.13"
// },
// {
// "name": "public_key",
// "description": "Public key infrastructure",
// "version": "0.20"
// },
// {
// "name": "rabbit",
// "description": "RabbitMQ",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_consistent_hash_exchange",
// "description": "Consistent Hash Exchange Type",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_management",
// "description": "RabbitMQ Management Console",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_management_agent",
// "description": "RabbitMQ Management Agent",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_mqtt",
// "description": "RabbitMQ MQTT Adapter",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_shovel",
// "description": "Data Shovel for RabbitMQ",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_shovel_management",
// "description": "Shovel Status",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_stomp",
// "description": "Embedded Rabbit Stomp Adapter",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_web_dispatch",
// "description": "RabbitMQ Web Dispatcher",
// "version": "3.2.0"
// },
// {
// "name": "rabbitmq_web_stomp",
// "description": "Rabbit WEB-STOMP - WebSockets to Stomp adapter",
// "version": "3.2.0"
// },
// {
// "name": "sasl",
// "description": "SASL CXC 138 11",
// "version": "2.3.3"
// },
// {
// "name": "sockjs",
// "description": "SockJS",
// "version": "0.3.4-rmq3.2.0-git3132eb9"
// },
// {
// "name": "ssl",
// "description": "Erlang\/OTP SSL application",
// "version": "5.3.1"
// },
// {
// "name": "stdlib",
// "description": "ERTS CXC 138 10",
// "version": "1.19.3"
// },
// {
// "name": "webmachine",
// "description": "webmachine",
// "version": "1.10.3-rmq3.2.0-gite9359c7"
// },
// {
// "name": "xmerl",
// "description": "XML parser",
// "version": "1.3.4"
// }
// ],
// "contexts": [
// {
// "description": "Redirect to port 15672",
// "path": "\/",
// "port": 55672,
// "ignore_in_use": true
// },
// {
// "description": "RabbitMQ Management",
// "path": "\/",
// "port": 15672
// }
// ],
// "name": "rabbit@mercurio",
// "type": "disc",
// "running": true
// }
func (c *Client) GetNode(name string) (rec *NodeInfo, err error) {
req, err := newGETRequest(c, "nodes/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}

View file

@ -0,0 +1,126 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/permissions
//
// Example response:
//
// [{"user":"guest","vhost":"/","configure":".*","write":".*","read":".*"}]
type PermissionInfo struct {
User string `json:"user"`
Vhost string `json:"vhost"`
// Configuration permissions
Configure string `json:"configure"`
// Write permissions
Write string `json:"write"`
// Read permissions
Read string `json:"read"`
}
// Returns permissions for all users and virtual hosts.
func (c *Client) ListPermissions() (rec []PermissionInfo, err error) {
req, err := newGETRequest(c, "permissions/")
if err != nil {
return []PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []PermissionInfo{}, err
}
return rec, nil
}
//
// GET /api/users/{user}/permissions
//
// Returns permissions of a specific user.
func (c *Client) ListPermissionsOf(username string) (rec []PermissionInfo, err error) {
req, err := newGETRequest(c, "users/"+url.QueryEscape(username)+"/permissions")
if err != nil {
return []PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []PermissionInfo{}, err
}
return rec, nil
}
//
// GET /api/permissions/{vhost}/{user}
//
// Returns permissions of user in virtual host.
func (c *Client) GetPermissionsIn(vhost, username string) (rec PermissionInfo, err error) {
req, err := newGETRequest(c, "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username))
if err != nil {
return PermissionInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return PermissionInfo{}, err
}
return rec, nil
}
//
// PUT /api/permissions/{vhost}/{user}
//
type Permissions struct {
Configure string `json:"configure"`
Write string `json:"write"`
Read string `json:"read"`
}
// Updates permissions of user in virtual host.
func (c *Client) UpdatePermissionsIn(vhost, username string, permissions Permissions) (res *http.Response, err error) {
body, err := json.Marshal(permissions)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/permissions/{vhost}/{user}
//
// Clears (deletes) permissions of user in virtual host.
func (c *Client) ClearPermissionsIn(vhost, username string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "permissions/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(username), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,31 @@
package rabbithole
func (c *Client) EnabledProtocols() (xs []string, err error) {
overview, err := c.Overview()
if err != nil {
return []string{}, err
}
// we really need to implement Map/Filter/etc. MK.
xs = make([]string, len(overview.Listeners))
for i, lnr := range overview.Listeners {
xs[i] = lnr.Protocol
}
return xs, nil
}
func (c *Client) ProtocolPorts() (res map[string]Port, err error) {
res = map[string]Port{}
overview, err := c.Overview()
if err != nil {
return res, err
}
for _, lnr := range overview.Listeners {
res[lnr.Protocol] = lnr.Port
}
return res, nil
}

View file

@ -0,0 +1,127 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Policy definition: additional arguments
// added to the entities (queues, exchanges or both)
// that match a policy.
type PolicyDefinition map[string]interface{}
type NodeNames []string
// Represents a configured policy.
type Policy struct {
// Virtual host this policy is in.
Vhost string `json:"vhost"`
// Regular expression pattern used to match queues and exchanges,
// , e.g. "^ha\..+"
Pattern string `json:"pattern"`
// What this policy applies to: "queues", "exchanges", etc.
ApplyTo string `json:"apply-to"`
Name string `json:"name"`
Priority int `json:"priority"`
// Additional arguments added to the entities (queues,
// exchanges or both) that match a policy
Definition PolicyDefinition `json:"definition"`
}
//
// GET /api/policies
//
// Return all policies (across all virtual hosts).
func (c *Client) ListPolicies() (rec []Policy, err error) {
req, err := newGETRequest(c, "policies")
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/policies/{vhost}
//
// Returns policies in a specific virtual host.
func (c *Client) ListPoliciesIn(vhost string) (rec []Policy, err error) {
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// GET /api/policies/{vhost}/{name}
//
// Returns individual policy in virtual host.
func (c *Client) GetPolicy(vhost, name string) (rec *Policy, err error) {
req, err := newGETRequest(c, "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/policies/{vhost}/{name}
//
// Updates a policy.
func (c *Client) PutPolicy(vhost string, name string, policy Policy) (res *http.Response, err error) {
body, err := json.Marshal(policy)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/policies/{vhost}/{name}
//
// Deletes a policy.
func (c *Client) DeletePolicy(vhost, name string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "policies/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(name), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,256 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
// Information about backing queue (queue storage engine).
type BackingQueueStatus struct {
Q1 int `json:"q1"`
Q2 int `json:"q2"`
Q3 int `json:"q3"`
Q4 int `json:"q4"`
// Total queue length
Length int64 `json:"len"`
// Number of pending acks from consumers
PendingAcks int64 `json:"pending_acks"`
// Number of messages held in RAM
RAMMessageCount int64 `json:"ram_msg_count"`
// Number of outstanding acks held in RAM
RAMAckCount int64 `json:"ram_ack_count"`
// Number of persistent messages in the store
PersistentCount int64 `json:"persistent_count"`
// Average ingress (inbound) rate, not including messages
// that straight through to auto-acking consumers.
AverageIngressRate float64 `json:"avg_ingress_rate"`
// Average egress (outbound) rate, not including messages
// that straight through to auto-acking consumers.
AverageEgressRate float64 `json:"avg_egress_rate"`
// rate at which unacknowledged message records enter RAM,
// e.g. because messages are delivered requiring acknowledgement
AverageAckIngressRate float32 `json:"avg_ack_ingress_rate"`
// rate at which unacknowledged message records leave RAM,
// e.g. because acks arrive or unacked messages are paged out
AverageAckEgressRate float32 `json:"avg_ack_egress_rate"`
}
type OwnerPidDetails struct {
Name string `json:"name"`
PeerPort Port `json:"peer_port"`
PeerHost string `json:"peer_host"`
}
type QueueInfo struct {
// Queue name
Name string `json:"name"`
// Virtual host this queue belongs to
Vhost string `json:"vhost"`
// Is this queue durable?
Durable bool `json:"durable"`
// Is this queue auto-delted?
AutoDelete bool `json:"auto_delete"`
// Extra queue arguments
Arguments map[string]interface{} `json:"arguments"`
// RabbitMQ node that hosts master for this queue
Node string `json:"node"`
// Queue status
Status string `json:"status"`
// Total amount of RAM used by this queue
Memory int64 `json:"memory"`
// How many consumers this queue has
Consumers int `json:"consumers"`
// If there is an exclusive consumer, its consumer tag
ExclusiveConsumerTag string `json:"exclusive_consumer_tag"`
// Policy applied to this queue, if any
Policy string `json:"policy"`
// Total number of messages in this queue
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
// Number of messages ready to be delivered
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
// Number of messages delivered and pending acknowledgements from consumers
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
MessageStats MessageStats `json:"message_stats"`
OwnerPidDetails OwnerPidDetails `json:"owner_pid_details"`
BackingQueueStatus BackingQueueStatus `json:"backing_queue_status"`
}
type DetailedQueueInfo QueueInfo
//
// GET /api/queues
//
// [
// {
// "owner_pid_details": {
// "name": "127.0.0.1:46928 -> 127.0.0.1:5672",
// "peer_port": 46928,
// "peer_host": "127.0.0.1"
// },
// "message_stats": {
// "publish": 19830,
// "publish_details": {
// "rate": 5
// }
// },
// "messages": 15,
// "messages_details": {
// "rate": 0
// },
// "messages_ready": 15,
// "messages_ready_details": {
// "rate": 0
// },
// "messages_unacknowledged": 0,
// "messages_unacknowledged_details": {
// "rate": 0
// },
// "policy": "",
// "exclusive_consumer_tag": "",
// "consumers": 0,
// "memory": 143112,
// "backing_queue_status": {
// "q1": 0,
// "q2": 0,
// "delta": [
// "delta",
// "undefined",
// 0,
// "undefined"
// ],
// "q3": 0,
// "q4": 15,
// "len": 15,
// "pending_acks": 0,
// "target_ram_count": "infinity",
// "ram_msg_count": 15,
// "ram_ack_count": 0,
// "next_seq_id": 19830,
// "persistent_count": 0,
// "avg_ingress_rate": 4.9920127795527,
// "avg_egress_rate": 4.9920127795527,
// "avg_ack_ingress_rate": 0,
// "avg_ack_egress_rate": 0
// },
// "status": "running",
// "name": "amq.gen-QLEaT5Rn_ogbN3O8ZOQt3Q",
// "vhost": "rabbit\/hole",
// "durable": false,
// "auto_delete": false,
// "arguments": {
// "x-message-ttl": 5000
// },
// "node": "rabbit@marzo"
// }
// ]
func (c *Client) ListQueues() (rec []QueueInfo, err error) {
req, err := newGETRequest(c, "queues")
if err != nil {
return []QueueInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []QueueInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}
//
func (c *Client) ListQueuesIn(vhost string) (rec []QueueInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost))
if err != nil {
return []QueueInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []QueueInfo{}, err
}
return rec, nil
}
//
// GET /api/queues/{vhost}/{name}
//
func (c *Client) GetQueue(vhost, queue string) (rec *DetailedQueueInfo, err error) {
req, err := newGETRequest(c, "queues/"+url.QueryEscape(vhost)+"/"+queue)
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/exchanges/{vhost}/{exchange}
//
type QueueSettings struct {
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Arguments map[string]interface{} `json:"arguments"`
}
func (c *Client) DeclareQueue(vhost, queue string, info QueueSettings) (res *http.Response, err error) {
if info.Arguments == nil {
info.Arguments = make(map[string]interface{})
}
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/queues/{vhost}/{name}
//
func (c *Client) DeleteQueue(vhost, queue string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "queues/"+url.QueryEscape(vhost)+"/"+url.QueryEscape(queue), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,109 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
type UserInfo struct {
Name string `json:"name"`
PasswordHash string `json:"password_hash"`
// Tags control permissions. Built-in tags: administrator, management, policymaker.
Tags string `json:"tags"`
}
// Settings used to create users. Tags must be comma-separated.
type UserSettings struct {
Name string `json:"name"`
// Tags control permissions. Administrator grants full
// permissions, management grants management UI and HTTP API
// access, policymaker grants policy management permissions.
Tags string `json:"tags"`
// *never* returned by RabbitMQ. Set by the client
// to create/update a user. MK.
Password string `json:"password"`
}
//
// GET /api/users
//
// Example response:
// [{"name":"guest","password_hash":"8LYTIFbVUwi8HuV/dGlp2BYsD1I=","tags":"administrator"}]
// Returns a list of all users in a cluster.
func (c *Client) ListUsers() (rec []UserInfo, err error) {
req, err := newGETRequest(c, "users/")
if err != nil {
return []UserInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []UserInfo{}, err
}
return rec, nil
}
//
// GET /api/users/{name}
//
// Returns information about individual user.
func (c *Client) GetUser(username string) (rec *UserInfo, err error) {
req, err := newGETRequest(c, "users/"+url.QueryEscape(username))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/users/{name}
//
// Updates information about individual user.
func (c *Client) PutUser(username string, info UserSettings) (res *http.Response, err error) {
body, err := json.Marshal(info)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "users/"+url.QueryEscape(username), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/users/{name}
//
// Deletes user.
func (c *Client) DeleteUser(username string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "users/"+url.QueryEscape(username), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,160 @@
package rabbithole
import (
"encoding/json"
"net/http"
"net/url"
)
//
// GET /api/vhosts
//
// Example response:
// [
// {
// "message_stats": {
// "publish": 78,
// "publish_details": {
// "rate": 0
// }
// },
// "messages": 0,
// "messages_details": {
// "rate": 0
// },
// "messages_ready": 0,
// "messages_ready_details": {
// "rate": 0
// },
// "messages_unacknowledged": 0,
// "messages_unacknowledged_details": {
// "rate": 0
// },
// "recv_oct": 16653,
// "recv_oct_details": {
// "rate": 0
// },
// "send_oct": 40495,
// "send_oct_details": {
// "rate": 0
// },
// "name": "\/",
// "tracing": false
// },
// {
// "name": "29dd51888b834698a8b5bc3e7f8623aa1c9671f5",
// "tracing": false
// }
// ]
type VhostInfo struct {
// Virtual host name
Name string `json:"name"`
// True if tracing is enabled for this virtual host
Tracing bool `json:"tracing"`
// Total number of messages in queues of this virtual host
Messages int `json:"messages"`
MessagesDetails RateDetails `json:"messages_details"`
// Total number of messages ready to be delivered in queues of this virtual host
MessagesReady int `json:"messages_ready"`
MessagesReadyDetails RateDetails `json:"messages_ready_details"`
// Total number of messages pending acknowledgement from consumers in this virtual host
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessagesUnacknowledgedDetails RateDetails `json:"messages_unacknowledged_details"`
// Octets received
RecvOct uint64 `json:"recv_oct"`
// Octets sent
SendOct uint64 `json:"send_oct"`
RecvCount uint64 `json:"recv_cnt"`
SendCount uint64 `json:"send_cnt"`
SendPending uint64 `json:"send_pend"`
RecvOctDetails RateDetails `json:"recv_oct_details"`
SendOctDetails RateDetails `json:"send_oct_details"`
}
// Returns a list of virtual hosts.
func (c *Client) ListVhosts() (rec []VhostInfo, err error) {
req, err := newGETRequest(c, "vhosts/")
if err != nil {
return []VhostInfo{}, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return []VhostInfo{}, err
}
return rec, nil
}
//
// GET /api/vhosts/{name}
//
// Returns information about a specific virtual host.
func (c *Client) GetVhost(vhostname string) (rec *VhostInfo, err error) {
req, err := newGETRequest(c, "vhosts/"+url.QueryEscape(vhostname))
if err != nil {
return nil, err
}
if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}
return rec, nil
}
//
// PUT /api/vhosts/{name}
//
// Settings used to create or modify virtual hosts.
type VhostSettings struct {
// True if tracing should be enabled.
Tracing bool `json:"tracing"`
}
// Creates or updates a virtual host.
func (c *Client) PutVhost(vhostname string, settings VhostSettings) (res *http.Response, err error) {
body, err := json.Marshal(settings)
if err != nil {
return nil, err
}
req, err := newRequestWithBody(c, "PUT", "vhosts/"+url.QueryEscape(vhostname), body)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}
//
// DELETE /api/vhosts/{name}
//
// Deletes a virtual host.
func (c *Client) DeleteVhost(vhostname string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "vhosts/"+url.QueryEscape(vhostname), nil)
if err != nil {
return nil, err
}
res, err = executeRequest(c, req)
if err != nil {
return nil, err
}
return res, nil
}

View file

@ -0,0 +1,118 @@
package rabbitmq
import (
"fmt"
"strings"
"sync"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/framework"
"github.com/michaelklishin/rabbit-hole"
)
// Factory creates and configures Backends
func Factory(conf *logical.BackendConfig) (logical.Backend, error) {
return Backend().Setup(conf)
}
// Backend creates a new Backend
func Backend() *framework.Backend {
var b backend
b.Backend = &framework.Backend{
Help: strings.TrimSpace(backendHelp),
PathsSpecial: &logical.Paths{
Root: []string{
"config/*",
},
},
Paths: []*framework.Path{
pathConfigConnection(&b),
pathConfigLease(&b),
pathRoles(&b),
pathRoleCreate(&b),
},
Secrets: []*framework.Secret{
secretCreds(&b),
},
Clean: b.ResetClient,
}
return b.Backend
}
type backend struct {
*framework.Backend
client *rabbithole.Client
lock sync.Mutex
}
// DB returns the database connection.
func (b *backend) Client(s logical.Storage) (*rabbithole.Client, error) {
b.lock.Lock()
defer b.lock.Unlock()
// If we already have a client, we got it!
if b.client != nil {
return b.client, nil
}
// Otherwise, attempt to make connection
entry, err := s.Get("config/connection")
if err != nil {
return nil, err
}
if entry == nil {
return nil,
fmt.Errorf("configure the client connection with config/connection first")
}
var connConfig connectionConfig
if err := entry.DecodeJSON(&connConfig); err != nil {
return nil, err
}
b.client, err = rabbithole.NewClient(connConfig.URI, connConfig.Username, connConfig.Password)
if err != nil {
return nil, err
}
return b.client, nil
}
// ResetClient forces a connection next time Client() is called.
func (b *backend) ResetClient() {
b.lock.Lock()
defer b.lock.Unlock()
b.client = nil
}
// Lease returns the lease information
func (b *backend) Lease(s logical.Storage) (*configLease, error) {
entry, err := s.Get("config/lease")
if err != nil {
return nil, err
}
if entry == nil {
return nil, nil
}
var result configLease
if err := entry.DecodeJSON(&result); err != nil {
return nil, err
}
return &result, nil
}
const backendHelp = `
The RabbitMQ backend dynamically generates RabbitMQ users.
After mounting this backend, configure it using the endpoints within
the "config/" path.
`

View file

@ -0,0 +1,200 @@
package rabbitmq
import (
"encoding/json"
"fmt"
"log"
"os"
"testing"
"github.com/hashicorp/vault/logical"
logicaltest "github.com/hashicorp/vault/logical/testing"
"github.com/michaelklishin/rabbit-hole"
"github.com/mitchellh/mapstructure"
)
func TestBackend_basic(t *testing.T) {
b, _ := Factory(logical.TestBackendConfig())
logicaltest.Test(t, logicaltest.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Backend: b,
Steps: []logicaltest.TestStep{
testAccStepConfig(t),
testAccStepRole(t),
testAccStepReadCreds(t, b, "web"),
},
})
}
func TestBackend_roleCrud(t *testing.T) {
b, _ := Factory(logical.TestBackendConfig())
logicaltest.Test(t, logicaltest.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Backend: b,
Steps: []logicaltest.TestStep{
testAccStepConfig(t),
testAccStepRole(t),
testAccStepReadRole(t, "web", "administrator", `{"/": {"configure": ".*", "write": ".*", "read": ".*"}}`),
testAccStepDeleteRole(t, "web"),
testAccStepReadRole(t, "web", "", ""),
},
})
}
func testAccPreCheck(t *testing.T) {
if uri := os.Getenv("RABBITMQ_MG_URI"); uri == "" {
t.Fatal("RABBITMQ_MG_URI must be set for acceptance tests")
}
if username := os.Getenv("RABBITMQ_MG_USERNAME"); username == "" {
t.Fatal("RABBITMQ_MG_USERNAME must be set for acceptance tests")
}
if password := os.Getenv("RABBITMQ_MG_PASSWORD"); password == "" {
t.Fatal("RABBITMQ_MG_PASSWORD must be set for acceptance tests")
}
}
func testAccStepConfig(t *testing.T) logicaltest.TestStep {
return logicaltest.TestStep{
Operation: logical.WriteOperation,
Path: "config/connection",
Data: map[string]interface{}{
"uri": os.Getenv("RABBITMQ_MG_URI"),
"username": os.Getenv("RABBITMQ_MG_USERNAME"),
"password": os.Getenv("RABBITMQ_MG_PASSWORD"),
},
}
}
func testAccStepRole(t *testing.T) logicaltest.TestStep {
return logicaltest.TestStep{
Operation: logical.WriteOperation,
Path: "roles/web",
Data: map[string]interface{}{
"tags": "administrator",
"vhosts": `{"/": {"configure": ".*", "write": ".*", "read": ".*"}}`,
},
}
}
func testAccStepDeleteRole(t *testing.T, n string) logicaltest.TestStep {
return logicaltest.TestStep{
Operation: logical.DeleteOperation,
Path: "roles/" + n,
}
}
func testAccStepReadCreds(t *testing.T, b logical.Backend, name string) logicaltest.TestStep {
return logicaltest.TestStep{
Operation: logical.ReadOperation,
Path: "creds/" + name,
Check: func(resp *logical.Response) error {
var d struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}
if err := mapstructure.Decode(resp.Data, &d); err != nil {
return err
}
log.Printf("[WARN] Generated credentials: %v", d)
uri := os.Getenv("RABBITMQ_MG_URI")
client, err := rabbithole.NewClient(uri, d.Username, d.Password)
if err != nil {
t.Fatal(err)
}
_, err = client.ListVhosts()
if err != nil {
t.Fatalf("unable to list vhosts with generated credentials: %s", err)
}
resp, err = b.HandleRequest(&logical.Request{
Operation: logical.RevokeOperation,
Secret: &logical.Secret{
InternalData: map[string]interface{}{
"secret_type": "creds",
"username": d.Username,
},
},
})
if err != nil {
return err
}
if resp != nil {
if resp.IsError() {
return fmt.Errorf("Error on resp: %#v", *resp)
}
}
client, err = rabbithole.NewClient(uri, d.Username, d.Password)
if err != nil {
t.Fatal(err)
}
_, err = client.ListVhosts()
if err == nil {
t.Fatalf("expected to fail listing vhosts: %s", err)
}
return nil
},
}
}
func testAccStepReadRole(t *testing.T, name, tags, rawVHosts string) logicaltest.TestStep {
return logicaltest.TestStep{
Operation: logical.ReadOperation,
Path: "roles/" + name,
Check: func(resp *logical.Response) error {
if resp == nil {
if tags == "" && rawVHosts == "" {
return nil
}
return fmt.Errorf("bad: %#v", resp)
}
var d struct {
Tags string `mapstructure:"tags"`
VHosts map[string]vhostPermission `mapstructure:"vhosts"`
}
if err := mapstructure.Decode(resp.Data, &d); err != nil {
return err
}
if d.Tags != tags {
return fmt.Errorf("bad: %#v", resp)
}
var vhosts map[string]vhostPermission
if err := json.Unmarshal([]byte(rawVHosts), &vhosts); err != nil {
return fmt.Errorf("bad expected vhosts %#v: %s", vhosts, err)
}
for host, permission := range vhosts {
actualPermission, ok := d.VHosts[host]
if !ok {
return fmt.Errorf("expected vhost: %s", host)
}
if actualPermission.Configure != permission.Configure {
fmt.Errorf("expected permission %s to be %s, got %s", "configure", permission.Configure, actualPermission.Configure)
}
if actualPermission.Write != permission.Write {
fmt.Errorf("expected permission %s to be %s, got %s", "write", permission.Write, actualPermission.Write)
}
if actualPermission.Read != permission.Read {
fmt.Errorf("expected permission %s to be %s, got %s", "read", permission.Read, actualPermission.Read)
}
}
return nil
},
}
}

View file

@ -0,0 +1,109 @@
package rabbitmq
import (
"fmt"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/framework"
"github.com/michaelklishin/rabbit-hole"
)
func pathConfigConnection(b *backend) *framework.Path {
return &framework.Path{
Pattern: "config/connection",
Fields: map[string]*framework.FieldSchema{
"uri": &framework.FieldSchema{
Type: framework.TypeString,
Description: "RabbitMQ Management URI",
},
"username": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Username of a RabbitMQ management administrator",
},
"password": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Password of the provided RabbitMQ management user",
},
},
Callbacks: map[logical.Operation]framework.OperationFunc{
logical.WriteOperation: b.pathConnectionWrite,
},
HelpSynopsis: pathConfigConnectionHelpSyn,
HelpDescription: pathConfigConnectionHelpDesc,
}
}
func (b *backend) pathConnectionWrite(req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
uri := data.Get("uri").(string)
username := data.Get("username").(string)
password := data.Get("password").(string)
if uri == "" {
return logical.ErrorResponse(fmt.Sprintf(
"'uri' is a required parameter.")), nil
}
if username == "" {
return logical.ErrorResponse(fmt.Sprintf(
"'username' is a required parameter.")), nil
}
if password == "" {
return logical.ErrorResponse(fmt.Sprintf(
"'password' is a required parameter.")), nil
}
// Verify the string
client, err := rabbithole.NewClient(uri, username, password)
if err != nil {
return logical.ErrorResponse(fmt.Sprintf(
"Error validating connection info: %s", err)), nil
}
_, err = client.ListUsers()
if err != nil {
return logical.ErrorResponse(fmt.Sprintf(
"Error validating connection info by listing users: %s", err)), nil
}
// Store it
entry, err := logical.StorageEntryJSON("config/connection", connectionConfig{
URI: uri,
Username: username,
Password: password,
})
if err != nil {
return nil, err
}
if err := req.Storage.Put(entry); err != nil {
return nil, err
}
// Reset the client connection
b.ResetClient()
return nil, nil
}
type connectionConfig struct {
URI string `json:"uri"`
Username string `json:"username"`
Password string `json:"password"`
}
const pathConfigConnectionHelpSyn = `
Configure the URI, username, and password to talk to RabbitMQ management HTTP API.
`
const pathConfigConnectionHelpDesc = `
This path configures the connection properties used to connect to RabbitMQ management HTTP API.
The "uri" parameter is a string that is be used to connect to the API. The "username"
and "password" parameters are strings and used as credentials to the API.
The URI looks like:
"http://localhost:15672"
When configuring the URI, username, and password, the backend will verify their validity.
`

View file

@ -0,0 +1,103 @@
package rabbitmq
import (
"fmt"
"time"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/framework"
)
func pathConfigLease(b *backend) *framework.Path {
return &framework.Path{
Pattern: "config/lease",
Fields: map[string]*framework.FieldSchema{
"lease": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Default lease for roles.",
},
"lease_max": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Maximum time a credential is valid for.",
},
},
Callbacks: map[logical.Operation]framework.OperationFunc{
logical.ReadOperation: b.pathLeaseRead,
logical.WriteOperation: b.pathLeaseWrite,
},
HelpSynopsis: pathConfigLeaseHelpSyn,
HelpDescription: pathConfigLeaseHelpDesc,
}
}
func (b *backend) pathLeaseWrite(
req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
leaseRaw := d.Get("lease").(string)
leaseMaxRaw := d.Get("lease_max").(string)
lease, err := time.ParseDuration(leaseRaw)
if err != nil {
return logical.ErrorResponse(fmt.Sprintf(
"Invalid lease: %s", err)), nil
}
leaseMax, err := time.ParseDuration(leaseMaxRaw)
if err != nil {
return logical.ErrorResponse(fmt.Sprintf(
"Invalid lease: %s", err)), nil
}
// Store it
entry, err := logical.StorageEntryJSON("config/lease", &configLease{
Lease: lease,
LeaseMax: leaseMax,
})
if err != nil {
return nil, err
}
if err := req.Storage.Put(entry); err != nil {
return nil, err
}
return nil, nil
}
func (b *backend) pathLeaseRead(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
lease, err := b.Lease(req.Storage)
if err != nil {
return nil, err
}
if lease == nil {
return nil, nil
}
return &logical.Response{
Data: map[string]interface{}{
"lease": lease.Lease.String(),
"lease_max": lease.LeaseMax.String(),
},
}, nil
}
type configLease struct {
Lease time.Duration
LeaseMax time.Duration
}
const pathConfigLeaseHelpSyn = `
Configure the default lease information for generated credentials.
`
const pathConfigLeaseHelpDesc = `
This configures the default lease information used for credentials
generated by this backend. The lease specifies the duration that a
credential will be valid for, as well as the maximum session for
a set of credentials.
The format for the lease is "1h" or integer and then unit. The longest
unit is hour.
`

View file

@ -0,0 +1,112 @@
package rabbitmq
import (
"fmt"
"time"
"github.com/hashicorp/uuid"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/framework"
"github.com/michaelklishin/rabbit-hole"
)
func pathRoleCreate(b *backend) *framework.Path {
return &framework.Path{
Pattern: "creds/" + framework.GenericNameRegex("name"),
Fields: map[string]*framework.FieldSchema{
"name": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Name of the role.",
},
},
Callbacks: map[logical.Operation]framework.OperationFunc{
logical.ReadOperation: b.pathRoleCreateRead,
},
HelpSynopsis: pathRoleCreateReadHelpSyn,
HelpDescription: pathRoleCreateReadHelpDesc,
}
}
func (b *backend) pathRoleCreateRead(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
name := data.Get("name").(string)
// Get the role
role, err := b.Role(req.Storage, name)
if err != nil {
return nil, err
}
if role == nil {
return logical.ErrorResponse(fmt.Sprintf("unknown role: %s", name)), nil
}
// Determine if we have a lease
lease, err := b.Lease(req.Storage)
if err != nil {
return nil, err
}
if lease == nil {
lease = &configLease{Lease: 1 * time.Hour}
}
// Generate the username, password and expiration. PG limits user to 63 characters
displayName := req.DisplayName
if len(displayName) > 26 {
displayName = displayName[:26]
}
username := fmt.Sprintf("%s-%s", displayName, uuid.GenerateUUID())
if len(username) > 63 {
username = username[:63]
}
password := uuid.GenerateUUID()
// Get our connection
client, err := b.Client(req.Storage)
if err != nil {
return nil, err
}
// Create the user
_, err = client.PutUser(username, rabbithole.UserSettings{
Password: password,
Tags: role.Tags,
})
if err != nil {
return nil, err
}
for vhost, permission := range role.VHosts {
_, err := client.UpdatePermissionsIn(vhost, username, rabbithole.Permissions{
Configure: permission.Configure,
Write: permission.Write,
Read: permission.Read,
})
if err != nil {
return nil, err
}
}
// Return the secret
resp := b.Secret(SecretCredsType).Response(map[string]interface{}{
"username": username,
"password": password,
}, map[string]interface{}{
"username": username,
})
resp.Secret.TTL = lease.Lease
return resp, nil
}
const pathRoleCreateReadHelpSyn = `
Request RabbitMQ credentials for a certain role.
`
const pathRoleCreateReadHelpDesc = `
This path reads RabbitMQ credentials for a certain role. The
RabbitMQ credentials will be generated on demand and will be automatically
revoked when the lease is up.
`

View file

@ -0,0 +1,150 @@
package rabbitmq
import (
"encoding/json"
"fmt"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/framework"
)
func pathRoles(b *backend) *framework.Path {
return &framework.Path{
Pattern: "roles/" + framework.GenericNameRegex("name"),
Fields: map[string]*framework.FieldSchema{
"name": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Name of the role.",
},
"tags": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Comma-separated list of tags for this role.",
},
"vhosts": &framework.FieldSchema{
Type: framework.TypeString,
Description: "A map of virtual hosts to permissions.",
},
},
Callbacks: map[logical.Operation]framework.OperationFunc{
logical.ReadOperation: b.pathRoleRead,
logical.WriteOperation: b.pathRoleCreate,
logical.DeleteOperation: b.pathRoleDelete,
},
HelpSynopsis: pathRoleHelpSyn,
HelpDescription: pathRoleHelpDesc,
}
}
func (b *backend) Role(s logical.Storage, n string) (*roleEntry, error) {
entry, err := s.Get("role/" + n)
if err != nil {
return nil, err
}
if entry == nil {
return nil, nil
}
var result roleEntry
if err := entry.DecodeJSON(&result); err != nil {
return nil, err
}
return &result, nil
}
func (b *backend) pathRoleDelete(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
err := req.Storage.Delete("role/" + data.Get("name").(string))
if err != nil {
return nil, err
}
return nil, nil
}
func (b *backend) pathRoleRead(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
role, err := b.Role(req.Storage, data.Get("name").(string))
if err != nil {
return nil, err
}
if role == nil {
return nil, nil
}
return &logical.Response{
Data: map[string]interface{}{
"tags": role.Tags,
"vhosts": role.VHosts,
},
}, nil
}
func (b *backend) pathRoleCreate(
req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
name := data.Get("name").(string)
tags := data.Get("tags").(string)
rawVHosts := data.Get("vhosts").(string)
var vhosts map[string]vhostPermission
if len(rawVHosts) > 0 {
err := json.Unmarshal([]byte(rawVHosts), &vhosts)
if err != nil {
return logical.ErrorResponse(fmt.Sprintf("failed to unmarshal vhosts: %s", err)), nil
}
}
// Store it
entry, err := logical.StorageEntryJSON("role/"+name, &roleEntry{
Tags: tags,
VHosts: vhosts,
})
if err != nil {
return nil, err
}
if err := req.Storage.Put(entry); err != nil {
return nil, err
}
return nil, nil
}
type roleEntry struct {
Tags string `json:"tags"`
VHosts map[string]vhostPermission `json:"vhosts"`
}
type vhostPermission struct {
Configure string `json:"configure"`
Write string `json:"write"`
Read string `json:"read"`
}
const pathRoleHelpSyn = `
Manage the roles that can be created with this backend.
`
const pathRoleHelpDesc = `
This path lets you manage the roles that can be created with this backend.
The "tags" parameter customizes the tags used to create the role.
This is a comma separated list of strings. The "vhosts" parameter customizes
the virtual hosts that this user will be associated with. This is a JSON object
passed as a string in the form:
{
"vhostOne": {
"configure": ".*",
"write": ".*",
"read": ".*"
},
"vhostTwo": {
"configure": ".*",
"write": ".*",
"read": ".*"
}
}
`

View file

@ -0,0 +1,78 @@
package rabbitmq
import (
"fmt"
"time"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/logical/framework"
)
// SecretCredsType is the key for this backend's secrets.
const SecretCredsType = "creds"
func secretCreds(b *backend) *framework.Secret {
return &framework.Secret{
Type: SecretCredsType,
Fields: map[string]*framework.FieldSchema{
"username": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Username",
},
"password": &framework.FieldSchema{
Type: framework.TypeString,
Description: "Password",
},
},
DefaultDuration: 1 * time.Hour,
DefaultGracePeriod: 10 * time.Minute,
Renew: b.secretCredsRenew,
Revoke: b.secretCredsRevoke,
}
}
func (b *backend) secretCredsRenew(
req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
// Get the lease information
lease, err := b.Lease(req.Storage)
if err != nil {
return nil, err
}
if lease == nil {
lease = &configLease{Lease: 1 * time.Hour}
}
f := framework.LeaseExtend(lease.Lease, lease.LeaseMax, false)
resp, err := f(req, d)
if err != nil {
return nil, err
}
return resp, nil
}
func (b *backend) secretCredsRevoke(
req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
// Get the username from the internal data
usernameRaw, ok := req.Secret.InternalData["username"]
if !ok {
return nil, fmt.Errorf("secret is missing username internal data")
}
username, ok := usernameRaw.(string)
// Get our connection
client, err := b.Client(req.Storage)
if err != nil {
return nil, err
}
_, err = client.DeleteUser(username)
if err != nil {
return logical.ErrorResponse(fmt.Sprintf("could not delete user: %s", err)), nil
}
return nil, nil
}

View file

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/vault/builtin/logical/mysql" "github.com/hashicorp/vault/builtin/logical/mysql"
"github.com/hashicorp/vault/builtin/logical/pki" "github.com/hashicorp/vault/builtin/logical/pki"
"github.com/hashicorp/vault/builtin/logical/postgresql" "github.com/hashicorp/vault/builtin/logical/postgresql"
"github.com/hashicorp/vault/builtin/logical/rabbitmq"
"github.com/hashicorp/vault/builtin/logical/ssh" "github.com/hashicorp/vault/builtin/logical/ssh"
"github.com/hashicorp/vault/builtin/logical/transit" "github.com/hashicorp/vault/builtin/logical/transit"
@ -76,6 +77,7 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
"transit": transit.Factory, "transit": transit.Factory,
"mysql": mysql.Factory, "mysql": mysql.Factory,
"ssh": ssh.Factory, "ssh": ssh.Factory,
"rabbitmq": rabbitmq.Factory,
}, },
ShutdownCh: makeShutdownCh(), ShutdownCh: makeShutdownCh(),
}, nil }, nil

View file

@ -0,0 +1,326 @@
---
layout: "docs"
page_title: "Secret Backend: RabbitMQ"
sidebar_current: "docs-secrets-rabbitmq"
description: |-
The RabbitMQ secret backend for Vault generates user credentials to access RabbitMQ.
---
# RabbitMQ Secret Backend
Name: `rabbitmq`
The RabbitMQ secret backend for Vault generates user credentials
dynamically based on configured permissions and virtual hosts. This means that
services that need to access a virtual host no longer need to hardcode credentials:
they can request them from Vault, and use Vault's leasing mechanism to more easily roll users.
Additionally, it introduces a new ability: with every service accessing
the messaging queue with unique credentials, it makes auditing much easier when
questionable data access is discovered: you can track it down to the specific
instance of a service based on the RabbitMQ username.
// TODO: Fix this
Vault makes use both of its own internal revocation system as well as the
deleting RabbitMQ users when creating RabbitMQ users to ensure that users
become invalid within a reasonable time of the lease expiring.
This page will show a quick start for this backend. For detailed documentation
on every path, use `vault path-help` after mounting the backend.
## Quick Start
The first step to using the PostgreSQL backend is to mount it.
Unlike the `generic` backend, the `rabbitmq` backend is not mounted by default.
```text
$ vault mount rabbitmq
Successfully mounted 'rabbitmq' at 'rabbitmq'!
```
Next, Vault must be configured to connect to the RabbitMQ. This is done by
writing the RabbitMQ management URI, RabbitMQ management administrator user, and
the user's password.
```text
$ vault write rabbitmq/config/connection \
uri="http://localhost:15672" \
username="admin" \
password="password"
```
In this case, we've configured Vault with the URI "http://localhost:15672", user "admin",
and password "password" connecting to a local RabbitMQ management instance. It is important
that the Vault user have the administrator privilege to manager users.
Optionally, we can configure the lease settings for credentials generated
by Vault. This is done by writing to the `config/lease` key:
```
$ vault write postgresql/config/lease lease=1h lease_max=24h
Success! Data written to: postgresql/config/lease
```
This restricts each credential to being valid or leased for 1 hour
at a time, with a maximum use period of 24 hours. This forces an
application to renew their credentials at least hourly, and to recycle
them once per day.
The next step is to configure a role. A role is a logical name that maps
to tags and virtual host permissions used to generated those credentials. For example,
lets create a "readwrite" virtual host role:
```text
$ vault write rabbitmq/roles/readwrite \
vhosts='{"/":{"write": ".*", "read": ".*"}}'
Success! Data written to: rabbitmq/roles/readonly
```
By writing to the `roles/readwrite` path we are defining the `readwrite` role.
This role will be created by evaluating the given `vhosts` and `tags` statements.
By default, no tags and no virtual hosts are assigned to a role. You can read more
about RabbitMQ management tags [here](https://www.rabbitmq.com/management.html#permissions).
Configure, write, and read permissions are granted per virtual host.
To generate a new set of credentials, we simply read from that role:
Vault is now configured to create and manage credentials for RabbitMQ!
```text
$ vault read rabbitmq/creds/readwrite
lease_id rabbitmq/creds/readwrite/2740df96-d1c2-7140-c406-77a137fa3ecf
lease_duration 3600
lease_renewable true
password e1b6c159-ca63-4c6a-3886-6639eae06c30
username root-4b95bf47-281d-dcb5-8a60-9594f8056092
```
By reading from the `creds/readwrite` path, Vault has generated a new
set of credentials using the `readwrite` role configuration. Here we
see the dynamically generated username and password, along with a one
hour lease.
Using ACLs, it is possible to restrict using the rabbitmq backend such
that trusted operators can manage the role definitions, and both
users and applications are restricted in the credentials they are
allowed to read.
If you get stuck at any time, simply run `vault path-help rabbitmq` or with a
subpath for interactive help output.
## API
### /rabbitmq/config/connection
#### POST
<dl class="api">
<dt>Description</dt>
<dd>
Configures the connection string used to communicate with RabbitMQ.
This is a root protected endpoint.
</dd>
<dt>Method</dt>
<dd>POST</dd>
<dt>URL</dt>
<dd>`/RabbitMQ/config/connection`</dd>
<dt>Parameters</dt>
<dd>
<ul>
<li>
<span class="param">uri</span>
<span class="param-flags">required</span>
The RabbitMQ management connection URI. e.g. "http://localhost:15672"
</li
<li>
<span class="param">username</span>
<span class="param-flags">required</span>
The RabbitMQ management administrator username. e.g. "admin"
</li>
<li>
<span class="param">password</span>
<span class="param-flags">required</span>
The RabbitMQ management administrator password. e.g. "password"
</li>
</ul>
</dd>
<dt>Returns</dt>
<dd>
A `204` response code.
</dd>
</dl>
### /postgresql/config/lease
#### POST
<dl class="api">
<dt>Description</dt>
<dd>
Configures the lease settings for generated credentials.
If not configured, leases default to 1 hour. This is a root
protected endpoint.
</dd>
<dt>Method</dt>
<dd>POST</dd>
<dt>URL</dt>
<dd>`/rabbitmq/config/lease`</dd>
<dt>Parameters</dt>
<dd>
<ul>
<li>
<span class="param">lease</span>
<span class="param-flags">required</span>
The lease value provided as a string duration
with time suffix. Hour is the largest suffix.
</li>
<li>
<span class="param">lease_max</span>
<span class="param-flags">required</span>
The maximum lease value provided as a string duration
with time suffix. Hour is the largest suffix.
</li>
</ul>
</dd>
<dt>Returns</dt>
<dd>
A `204` response code.
</dd>
</dl>
### /rabbitmq/roles/
#### POST
<dl class="api">
<dt>Description</dt>
<dd>
Creates or updates the role definition.
</dd>
<dt>Method</dt>
<dd>POST</dd>
<dt>URL</dt>
<dd>`/rabbitmq/roles/<name>`</dd>
<dt>Parameters</dt>
<dd>
<ul>
<li>
<span class="param">tags</span>
<span class="param-flags">optional</span>
Comma-separated RabbitMQ management tags.
</li>
<li>
<span class="param">vhost</span>
<span class="param-flags">optional</span>
A map of virtual hosts to permissions.
</li>
</ul>
</dd>
<dt>Returns</dt>
<dd>
A `204` response code.
</dd>
</dl>
#### GET
<dl class="api">
<dt>Description</dt>
<dd>
Queries the role definition.
</dd>
<dt>Method</dt>
<dd>GET</dd>
<dt>URL</dt>
<dd>`/rabbitmq/roles/<name>`</dd>
<dt>Parameters</dt>
<dd>
None
</dd>
<dt>Returns</dt>
<dd>
```javascript
{
"data": {
"tags": "",
"vhost": "{\"/\": {\"configure\:".*", \"write\:".*", \"read\": \".*\"}}"
}
}
```
</dd>
</dl>
#### DELETE
<dl class="api">
<dt>Description</dt>
<dd>
Deletes the role definition.
</dd>
<dt>Method</dt>
<dd>DELETE</dd>
<dt>URL</dt>
<dd>`/rabbitmq/roles/<name>`</dd>
<dt>Parameters</dt>
<dd>
None
</dd>
<dt>Returns</dt>
<dd>
A `204` response code.
</dd>
</dl>
### /rabbitmq/creds/
#### GET
<dl class="api">
<dt>Description</dt>
<dd>
Generates a new set of dynamic credentials based on the named role.
</dd>
<dt>Method</dt>
<dd>GET</dd>
<dt>URL</dt>
<dd>`/postgresql/creds/<name>`</dd>
<dt>Parameters</dt>
<dd>
None
</dd>
<dt>Returns</dt>
<dd>
```javascript
{
"data": {
"username": "root-4b95bf47-281d-dcb5-8a60-9594f8056092",
"password": "e1b6c159-ca63-4c6a-3886-6639eae06c30"
}
}
```
</dd>
</dl>