2016-01-20 00:00:09 +00:00
|
|
|
package physical
|
|
|
|
|
|
|
|
import (
|
|
|
|
"database/sql"
|
|
|
|
"fmt"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/armon/go-metrics"
|
2016-01-22 16:47:02 +00:00
|
|
|
"github.com/lib/pq"
|
2016-01-20 00:00:09 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// PostgreSQL Backend is a physical backend that stores data
|
|
|
|
// within a PostgreSQL database.
|
|
|
|
type PostgreSQLBackend struct {
|
|
|
|
table string
|
|
|
|
client *sql.DB
|
|
|
|
statements map[string]*sql.Stmt
|
|
|
|
}
|
|
|
|
|
|
|
|
// newPostgreSQLBackend constructs a PostgreSQL backend using the given
|
|
|
|
// API client, server address, credentials, and database.
|
|
|
|
func newPostgreSQLBackend(conf map[string]string) (Backend, error) {
|
|
|
|
// Get the PostgreSQL credentials to perform read/write operations.
|
|
|
|
connURL, ok := conf["connection_url"]
|
|
|
|
if !ok || connURL == "" {
|
|
|
|
return nil, fmt.Errorf("missing connection_url")
|
|
|
|
}
|
|
|
|
|
2016-01-22 16:47:02 +00:00
|
|
|
unquoted_table, ok := conf["table"]
|
2016-01-20 00:00:09 +00:00
|
|
|
if !ok {
|
2016-01-28 00:15:48 +00:00
|
|
|
unquoted_table = "vault_kv_store"
|
2016-01-20 17:47:54 +00:00
|
|
|
}
|
2016-01-22 16:47:02 +00:00
|
|
|
quoted_table := pq.QuoteIdentifier(unquoted_table)
|
2016-01-20 17:47:54 +00:00
|
|
|
|
2016-01-21 01:52:49 +00:00
|
|
|
// Create PostgreSQL handle for the database.
|
|
|
|
db, err := sql.Open("postgres", connURL)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to connect to postgres: %v", err)
|
|
|
|
}
|
2016-01-20 17:47:54 +00:00
|
|
|
|
2016-01-21 01:52:49 +00:00
|
|
|
// Determine if we should use an upsert function (versions < 9.5)
|
|
|
|
var upsert_required bool
|
|
|
|
upsert_required_query := "SELECT string_to_array(setting, '.')::int[] < '{9,5}' FROM pg_settings WHERE name = 'server_version'"
|
|
|
|
if err := db.QueryRow(upsert_required_query).Scan(&upsert_required); err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to check for native upsert: %v", err)
|
|
|
|
}
|
2016-01-20 17:47:54 +00:00
|
|
|
|
2016-01-22 16:47:02 +00:00
|
|
|
// Setup our put strategy based on the presence or absence of a native
|
2016-01-23 00:15:10 +00:00
|
|
|
// upsert.
|
2016-01-21 01:52:49 +00:00
|
|
|
var put_statement string
|
|
|
|
if upsert_required {
|
2016-01-29 20:47:10 +00:00
|
|
|
put_statement = "SELECT vault_kv_put($1, $2, $3, $4)"
|
2016-01-21 01:52:49 +00:00
|
|
|
} else {
|
2016-01-29 20:47:10 +00:00
|
|
|
put_statement = "INSERT INTO " + quoted_table + " VALUES($1, $2, $3, $4)" +
|
|
|
|
" ON CONFLICT (path, key) DO " +
|
|
|
|
" UPDATE SET (parent_path, path, key, value) = ($1, $2, $3, $4)"
|
2016-01-20 00:00:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Setup the backend.
|
|
|
|
m := &PostgreSQLBackend{
|
2016-01-28 00:15:48 +00:00
|
|
|
table: quoted_table,
|
2016-01-20 00:00:09 +00:00
|
|
|
client: db,
|
|
|
|
statements: make(map[string]*sql.Stmt),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Prepare all the statements required
|
|
|
|
statements := map[string]string{
|
2016-01-20 17:47:54 +00:00
|
|
|
"put": put_statement,
|
2016-01-29 20:47:10 +00:00
|
|
|
"get": "SELECT value FROM " + quoted_table + " WHERE path = $1 AND key = $2",
|
|
|
|
"delete": "DELETE FROM " + quoted_table + " WHERE path = $1 AND key = $2",
|
2016-04-24 00:16:00 +00:00
|
|
|
"list": "SELECT key FROM " + quoted_table + " WHERE path = $1" +
|
|
|
|
"UNION SELECT substr(path, length($1)+1) FROM " + quoted_table + "WHERE parent_path = $1",
|
2016-01-20 00:00:09 +00:00
|
|
|
}
|
|
|
|
for name, query := range statements {
|
|
|
|
if err := m.prepare(name, query); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return m, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// prepare is a helper to prepare a query for future execution
|
|
|
|
func (m *PostgreSQLBackend) prepare(name, query string) error {
|
|
|
|
stmt, err := m.client.Prepare(query)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to prepare '%s': %v", name, err)
|
|
|
|
}
|
|
|
|
m.statements[name] = stmt
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-01-29 20:47:10 +00:00
|
|
|
// splitKey is a helper to split a full path key into individual
|
|
|
|
// parts: parentPath, path, key
|
|
|
|
func (m *PostgreSQLBackend) splitKey(fullPath string) (string, string, string) {
|
|
|
|
var parentPath string
|
|
|
|
var path string
|
|
|
|
|
|
|
|
pieces := strings.Split(fullPath, "/")
|
2016-04-24 00:16:00 +00:00
|
|
|
depth := len(pieces)
|
|
|
|
key := pieces[depth-1]
|
|
|
|
|
2016-01-29 20:47:10 +00:00
|
|
|
if depth == 1 {
|
|
|
|
parentPath = ""
|
2016-04-24 00:16:00 +00:00
|
|
|
path = "/"
|
2016-01-29 20:47:10 +00:00
|
|
|
} else if depth == 2 {
|
|
|
|
parentPath = "/"
|
2016-04-24 00:16:00 +00:00
|
|
|
path = "/" + pieces[0] + "/"
|
2016-01-29 20:47:10 +00:00
|
|
|
} else {
|
|
|
|
parentPath = "/" + strings.Join(pieces[:depth-2], "/") + "/"
|
2016-04-24 00:16:00 +00:00
|
|
|
path = "/" + strings.Join(pieces[:depth-1], "/") + "/"
|
2016-01-29 20:47:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return parentPath, path, key
|
|
|
|
}
|
|
|
|
|
2016-01-20 00:00:09 +00:00
|
|
|
// Put is used to insert or update an entry.
|
|
|
|
func (m *PostgreSQLBackend) Put(entry *Entry) error {
|
|
|
|
defer metrics.MeasureSince([]string{"postgres", "put"}, time.Now())
|
|
|
|
|
2016-01-29 20:47:10 +00:00
|
|
|
parentPath, path, key := m.splitKey(entry.Key)
|
|
|
|
|
|
|
|
_, err := m.statements["put"].Exec(parentPath, path, key, entry.Value)
|
2016-01-20 00:00:09 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get is used to fetch and entry.
|
2016-01-29 20:47:10 +00:00
|
|
|
func (m *PostgreSQLBackend) Get(fullPath string) (*Entry, error) {
|
2016-01-20 00:00:09 +00:00
|
|
|
defer metrics.MeasureSince([]string{"postgres", "get"}, time.Now())
|
|
|
|
|
2016-01-29 20:47:10 +00:00
|
|
|
_, path, key := m.splitKey(fullPath)
|
|
|
|
|
2016-01-20 00:00:09 +00:00
|
|
|
var result []byte
|
2016-01-29 20:47:10 +00:00
|
|
|
err := m.statements["get"].QueryRow(path, key).Scan(&result)
|
2016-01-20 00:00:09 +00:00
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ent := &Entry{
|
|
|
|
Key: key,
|
|
|
|
Value: result,
|
|
|
|
}
|
|
|
|
return ent, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete is used to permanently delete an entry
|
2016-01-29 20:47:10 +00:00
|
|
|
func (m *PostgreSQLBackend) Delete(fullPath string) error {
|
2016-01-20 00:00:09 +00:00
|
|
|
defer metrics.MeasureSince([]string{"postgres", "delete"}, time.Now())
|
|
|
|
|
2016-01-29 20:47:10 +00:00
|
|
|
_, path, key := m.splitKey(fullPath)
|
|
|
|
|
|
|
|
_, err := m.statements["delete"].Exec(path, key)
|
2016-01-20 00:00:09 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// List is used to list all the keys under a given
|
|
|
|
// prefix, up to the next prefix.
|
|
|
|
func (m *PostgreSQLBackend) List(prefix string) ([]string, error) {
|
|
|
|
defer metrics.MeasureSince([]string{"postgres", "list"}, time.Now())
|
|
|
|
|
2016-01-29 20:47:10 +00:00
|
|
|
rows, err := m.statements["list"].Query("/" + prefix)
|
2016-01-21 00:02:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer rows.Close()
|
2016-01-20 00:00:09 +00:00
|
|
|
|
|
|
|
var keys []string
|
|
|
|
for rows.Next() {
|
|
|
|
var key string
|
|
|
|
err = rows.Scan(&key)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to scan rows: %v", err)
|
|
|
|
}
|
|
|
|
|
2016-01-29 20:47:10 +00:00
|
|
|
keys = append(keys, key)
|
2016-01-20 00:00:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return keys, nil
|
|
|
|
}
|