Merge pull request #991 from quixoten/speedy_pg_physical
Make the PostgreSQL backend more performant
This commit is contained in:
commit
f354e9727a
|
@ -3,7 +3,6 @@ package physical
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -51,11 +50,11 @@ func newPostgreSQLBackend(conf map[string]string) (Backend, error) {
|
||||||
// upsert.
|
// upsert.
|
||||||
var put_statement string
|
var put_statement string
|
||||||
if upsert_required {
|
if upsert_required {
|
||||||
put_statement = "SELECT vault_kv_put($1, $2)"
|
put_statement = "SELECT vault_kv_put($1, $2, $3, $4)"
|
||||||
} else {
|
} else {
|
||||||
put_statement = "INSERT INTO " + quoted_table + " VALUES($1, $2)" +
|
put_statement = "INSERT INTO " + quoted_table + " VALUES($1, $2, $3, $4)" +
|
||||||
" ON CONFLICT (key) DO " +
|
" ON CONFLICT (path, key) DO " +
|
||||||
" UPDATE SET value = $2"
|
" UPDATE SET (parent_path, path, key, value) = ($1, $2, $3, $4)"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup the backend.
|
// Setup the backend.
|
||||||
|
@ -68,9 +67,10 @@ func newPostgreSQLBackend(conf map[string]string) (Backend, error) {
|
||||||
// Prepare all the statements required
|
// Prepare all the statements required
|
||||||
statements := map[string]string{
|
statements := map[string]string{
|
||||||
"put": put_statement,
|
"put": put_statement,
|
||||||
"get": "SELECT value FROM " + quoted_table + " WHERE key = $1",
|
"get": "SELECT value FROM " + quoted_table + " WHERE path = $1 AND key = $2",
|
||||||
"delete": "DELETE FROM " + quoted_table + " WHERE key = $1",
|
"delete": "DELETE FROM " + quoted_table + " WHERE path = $1 AND key = $2",
|
||||||
"list": "SELECT key FROM " + quoted_table + " WHERE key LIKE $1",
|
"list": "SELECT key FROM " + quoted_table + " WHERE path = $1" +
|
||||||
|
"UNION SELECT substr(path, length($1)+1) FROM " + quoted_table + "WHERE parent_path = $1",
|
||||||
}
|
}
|
||||||
for name, query := range statements {
|
for name, query := range statements {
|
||||||
if err := m.prepare(name, query); err != nil {
|
if err := m.prepare(name, query); err != nil {
|
||||||
|
@ -90,11 +90,37 @@ func (m *PostgreSQLBackend) prepare(name, query string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// splitKey is a helper to split a full path key into individual
|
||||||
|
// parts: parentPath, path, key
|
||||||
|
func (m *PostgreSQLBackend) splitKey(fullPath string) (string, string, string) {
|
||||||
|
var parentPath string
|
||||||
|
var path string
|
||||||
|
|
||||||
|
pieces := strings.Split(fullPath, "/")
|
||||||
|
depth := len(pieces)
|
||||||
|
key := pieces[depth-1]
|
||||||
|
|
||||||
|
if depth == 1 {
|
||||||
|
parentPath = ""
|
||||||
|
path = "/"
|
||||||
|
} else if depth == 2 {
|
||||||
|
parentPath = "/"
|
||||||
|
path = "/" + pieces[0] + "/"
|
||||||
|
} else {
|
||||||
|
parentPath = "/" + strings.Join(pieces[:depth-2], "/") + "/"
|
||||||
|
path = "/" + strings.Join(pieces[:depth-1], "/") + "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
return parentPath, path, key
|
||||||
|
}
|
||||||
|
|
||||||
// Put is used to insert or update an entry.
|
// Put is used to insert or update an entry.
|
||||||
func (m *PostgreSQLBackend) Put(entry *Entry) error {
|
func (m *PostgreSQLBackend) Put(entry *Entry) error {
|
||||||
defer metrics.MeasureSince([]string{"postgres", "put"}, time.Now())
|
defer metrics.MeasureSince([]string{"postgres", "put"}, time.Now())
|
||||||
|
|
||||||
_, err := m.statements["put"].Exec(entry.Key, entry.Value)
|
parentPath, path, key := m.splitKey(entry.Key)
|
||||||
|
|
||||||
|
_, err := m.statements["put"].Exec(parentPath, path, key, entry.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -102,11 +128,13 @@ func (m *PostgreSQLBackend) Put(entry *Entry) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get is used to fetch and entry.
|
// Get is used to fetch and entry.
|
||||||
func (m *PostgreSQLBackend) Get(key string) (*Entry, error) {
|
func (m *PostgreSQLBackend) Get(fullPath string) (*Entry, error) {
|
||||||
defer metrics.MeasureSince([]string{"postgres", "get"}, time.Now())
|
defer metrics.MeasureSince([]string{"postgres", "get"}, time.Now())
|
||||||
|
|
||||||
|
_, path, key := m.splitKey(fullPath)
|
||||||
|
|
||||||
var result []byte
|
var result []byte
|
||||||
err := m.statements["get"].QueryRow(key).Scan(&result)
|
err := m.statements["get"].QueryRow(path, key).Scan(&result)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -122,10 +150,12 @@ func (m *PostgreSQLBackend) Get(key string) (*Entry, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete is used to permanently delete an entry
|
// Delete is used to permanently delete an entry
|
||||||
func (m *PostgreSQLBackend) Delete(key string) error {
|
func (m *PostgreSQLBackend) Delete(fullPath string) error {
|
||||||
defer metrics.MeasureSince([]string{"postgres", "delete"}, time.Now())
|
defer metrics.MeasureSince([]string{"postgres", "delete"}, time.Now())
|
||||||
|
|
||||||
_, err := m.statements["delete"].Exec(key)
|
_, path, key := m.splitKey(fullPath)
|
||||||
|
|
||||||
|
_, err := m.statements["delete"].Exec(path, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -137,9 +167,7 @@ func (m *PostgreSQLBackend) Delete(key string) error {
|
||||||
func (m *PostgreSQLBackend) List(prefix string) ([]string, error) {
|
func (m *PostgreSQLBackend) List(prefix string) ([]string, error) {
|
||||||
defer metrics.MeasureSince([]string{"postgres", "list"}, time.Now())
|
defer metrics.MeasureSince([]string{"postgres", "list"}, time.Now())
|
||||||
|
|
||||||
// Add the % wildcard to the prefix to do the prefix search
|
rows, err := m.statements["list"].Query("/" + prefix)
|
||||||
likePrefix := prefix + "%"
|
|
||||||
rows, err := m.statements["list"].Query(likePrefix)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -153,16 +181,8 @@ func (m *PostgreSQLBackend) List(prefix string) ([]string, error) {
|
||||||
return nil, fmt.Errorf("failed to scan rows: %v", err)
|
return nil, fmt.Errorf("failed to scan rows: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
key = strings.TrimPrefix(key, prefix)
|
|
||||||
if i := strings.Index(key, "/"); i == -1 {
|
|
||||||
// Add objects only from the current 'folder'
|
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
} else {
|
|
||||||
// Add truncated 'folder' paths
|
|
||||||
keys = appendIfMissing(keys, string(key[:i+1]))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Strings(keys)
|
|
||||||
return keys, nil
|
return keys, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -329,27 +329,32 @@ The PostgreSQL backend has the following options:
|
||||||
* `table` (optional) - The name of the table to write vault data to. Defaults
|
* `table` (optional) - The name of the table to write vault data to. Defaults
|
||||||
to "vault_kv_store".
|
to "vault_kv_store".
|
||||||
|
|
||||||
Make sure the PostgreSQL database you choose (or create) for vault storage has
|
Add the following table and index to a new or existing PostgreSQL database:
|
||||||
a table suitable for storing vault's data:
|
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
CREATE TABLE vault_kv_store (
|
CREATE TABLE vault_kv_store (
|
||||||
key TEXT PRIMARY KEY,
|
parent_path TEXT COLLATE "C" NOT NULL,
|
||||||
value BYTEA
|
path TEXT COLLATE "C",
|
||||||
|
key TEXT COLLATE "C",
|
||||||
|
value BYTEA,
|
||||||
|
CONSTRAINT pkey PRIMARY KEY (path, key)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE INDEX parent_path_idx ON vault_kv_store (parent_path);
|
||||||
```
|
```
|
||||||
|
|
||||||
If you're using a version of PostgreSQL prior to 9.5, vault will expect an
|
If you're using a version of PostgreSQL prior to 9.5, create the following
|
||||||
upsert function to exist named "vault_kv_put". The recommanded function to use
|
function:
|
||||||
for this operation is:
|
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
CREATE FUNCTION vault_kv_put(_key TEXT, _value BYTEA) RETURNS VOID AS
|
CREATE FUNCTION vault_kv_put(_parent_path TEXT, _path TEXT, _key TEXT, _value BYTEA) RETURNS VOID AS
|
||||||
$$
|
$$
|
||||||
BEGIN
|
BEGIN
|
||||||
LOOP
|
LOOP
|
||||||
-- first try to update the key
|
-- first try to update the key
|
||||||
UPDATE vault_kv_store SET value = _value WHERE key = _key;
|
UPDATE vault_kv_store
|
||||||
|
SET (parent_path, path, key, value) = (_parent_path, _path, _key, _value)
|
||||||
|
WHERE _path = path AND key = _key;
|
||||||
IF found THEN
|
IF found THEN
|
||||||
RETURN;
|
RETURN;
|
||||||
END IF;
|
END IF;
|
||||||
|
@ -357,7 +362,8 @@ BEGIN
|
||||||
-- if someone else inserts the same key concurrently,
|
-- if someone else inserts the same key concurrently,
|
||||||
-- we could get a unique-key failure
|
-- we could get a unique-key failure
|
||||||
BEGIN
|
BEGIN
|
||||||
INSERT INTO vault_kv_store (key, value) VALUES (_key, _value);
|
INSERT INTO vault_kv_store (parent_path, path, key, value)
|
||||||
|
VALUES (_parent_path, _path, _key, _value);
|
||||||
RETURN;
|
RETURN;
|
||||||
EXCEPTION WHEN unique_violation THEN
|
EXCEPTION WHEN unique_violation THEN
|
||||||
-- Do nothing, and loop to try the UPDATE again.
|
-- Do nothing, and loop to try the UPDATE again.
|
||||||
|
|
Loading…
Reference in a new issue