open-vault/physical/consul.go

270 lines
6.2 KiB
Go

package physical
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
"time"
"crypto/tls"
"crypto/x509"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/go-cleanhttp"
)
// ConsulBackend is a physical backend that stores data at specific
// prefix within Consul. It is used for most production situations as
// it allows Vault to run on multiple machines in a highly-available manner.
type ConsulBackend struct {
path string
client *api.Client
kv *api.KV
permitPool *PermitPool
}
// newConsulBackend constructs a Consul backend using the given API client
// and the prefix in the KV store.
func newConsulBackend(conf map[string]string) (Backend, error) {
// Get the path in Consul
path, ok := conf["path"]
if !ok {
path = "vault/"
}
// Ensure path is suffixed but not prefixed
if !strings.HasSuffix(path, "/") {
path += "/"
}
if strings.HasPrefix(path, "/") {
path = strings.TrimPrefix(path, "/")
}
// Configure the client
consulConf := api.DefaultConfig()
if addr, ok := conf["address"]; ok {
consulConf.Address = addr
}
if scheme, ok := conf["scheme"]; ok {
consulConf.Scheme = scheme
}
if token, ok := conf["token"]; ok {
consulConf.Token = token
}
if consulConf.Scheme == "https" {
tlsClientConfig, err := setupTLSConfig(conf)
if err != nil {
return nil, err
}
transport := cleanhttp.DefaultPooledTransport()
transport.MaxIdleConnsPerHost = 4
transport.TLSClientConfig = tlsClientConfig
consulConf.HttpClient.Transport = transport
}
client, err := api.NewClient(consulConf)
if err != nil {
return nil, errwrap.Wrapf("client setup failed: {{err}}", err)
}
maxParStr, ok := conf["max_parallel"]
var maxParInt int
if ok {
maxParInt, err = strconv.Atoi(maxParStr)
if err != nil {
return nil, errwrap.Wrapf("failed parsing max_parallel parameter: {{err}}", err)
}
}
// Setup the backend
c := &ConsulBackend{
path: path,
client: client,
kv: client.KV(),
permitPool: NewPermitPool(maxParInt),
}
return c, nil
}
func setupTLSConfig(conf map[string]string) (*tls.Config, error) {
serverName := strings.Split(conf["address"], ":")
insecureSkipVerify := false
if _, ok := conf["tls_skip_verify"]; ok {
insecureSkipVerify = true
}
tlsClientConfig := &tls.Config{
InsecureSkipVerify: insecureSkipVerify,
ServerName: serverName[0],
}
_, okCert := conf["tls_cert_file"]
_, okKey := conf["tls_key_file"]
if okCert && okKey {
tlsCert, err := tls.LoadX509KeyPair(conf["tls_cert_file"], conf["tls_key_file"])
if err != nil {
return nil, fmt.Errorf("client tls setup failed: %v", err)
}
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
}
if tlsCaFile, ok := conf["tls_ca_file"]; ok {
caPool := x509.NewCertPool()
data, err := ioutil.ReadFile(tlsCaFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file: %v", err)
}
if !caPool.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
tlsClientConfig.RootCAs = caPool
}
return tlsClientConfig, nil
}
// Put is used to insert or update an entry
func (c *ConsulBackend) Put(entry *Entry) error {
defer metrics.MeasureSince([]string{"consul", "put"}, time.Now())
pair := &api.KVPair{
Key: c.path + entry.Key,
Value: entry.Value,
}
c.permitPool.Acquire()
defer c.permitPool.Release()
_, err := c.kv.Put(pair, nil)
return err
}
// Get is used to fetch an entry
func (c *ConsulBackend) Get(key string) (*Entry, error) {
defer metrics.MeasureSince([]string{"consul", "get"}, time.Now())
c.permitPool.Acquire()
defer c.permitPool.Release()
pair, _, err := c.kv.Get(c.path+key, nil)
if err != nil {
return nil, err
}
if pair == nil {
return nil, nil
}
ent := &Entry{
Key: key,
Value: pair.Value,
}
return ent, nil
}
// Delete is used to permanently delete an entry
func (c *ConsulBackend) Delete(key string) error {
defer metrics.MeasureSince([]string{"consul", "delete"}, time.Now())
c.permitPool.Acquire()
defer c.permitPool.Release()
_, err := c.kv.Delete(c.path+key, nil)
return err
}
// List is used to list all the keys under a given
// prefix, up to the next prefix.
func (c *ConsulBackend) List(prefix string) ([]string, error) {
defer metrics.MeasureSince([]string{"consul", "list"}, time.Now())
scan := c.path + prefix
// The TrimPrefix call below will not work correctly if we have "//" at the
// end. This can happen in cases where you are e.g. listing the root of a
// prefix in a logical backend via "/" instead of ""
if strings.HasSuffix(scan, "//") {
scan = scan[:len(scan)-1]
}
c.permitPool.Acquire()
defer c.permitPool.Release()
out, _, err := c.kv.Keys(scan, "/", nil)
for idx, val := range out {
out[idx] = strings.TrimPrefix(val, scan)
}
return out, err
}
// Lock is used for mutual exclusion based on the given key.
func (c *ConsulBackend) LockWith(key, value string) (Lock, error) {
// Create the lock
opts := &api.LockOptions{
Key: c.path + key,
Value: []byte(value),
SessionName: "Vault Lock",
MonitorRetries: 5,
}
lock, err := c.client.LockOpts(opts)
if err != nil {
return nil, fmt.Errorf("failed to create lock: %v", err)
}
cl := &ConsulLock{
client: c.client,
key: c.path + key,
lock: lock,
}
return cl, nil
}
// DetectHostAddr is used to detect the host address by asking the Consul agent
func (c *ConsulBackend) DetectHostAddr() (string, error) {
agent := c.client.Agent()
self, err := agent.Self()
if err != nil {
return "", err
}
addr := self["Member"]["Addr"].(string)
return addr, nil
}
// ConsulLock is used to provide the Lock interface backed by Consul
type ConsulLock struct {
client *api.Client
key string
lock *api.Lock
}
func (c *ConsulLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
return c.lock.Lock(stopCh)
}
func (c *ConsulLock) Unlock() error {
return c.lock.Unlock()
}
func (c *ConsulLock) Value() (bool, string, error) {
kv := c.client.KV()
pair, _, err := kv.Get(c.key, nil)
if err != nil {
return false, "", err
}
if pair == nil {
return false, "", nil
}
held := pair.Session != ""
value := string(pair.Value)
return held, value, nil
}