From 825ffd130a7c9f663d2c451a54f4bc88cbd76aa0 Mon Sep 17 00:00:00 2001 From: Joel Kenny Date: Tue, 29 Mar 2022 13:12:06 -0400 Subject: [PATCH] cockroachdb: add high-availability support (#12965) This commit adds high-availability support to the CockroachDB backend. The locking strategy implemented is heavily influenced from the very similar Postgres backend. --- changelog/12965.txt | 3 + physical/cockroachdb/cockroachdb.go | 83 ++++++++-- physical/cockroachdb/cockroachdb_ha.go | 201 +++++++++++++++++++++++ physical/cockroachdb/cockroachdb_test.go | 71 +++++--- 4 files changed, 322 insertions(+), 36 deletions(-) create mode 100644 changelog/12965.txt create mode 100644 physical/cockroachdb/cockroachdb_ha.go diff --git a/changelog/12965.txt b/changelog/12965.txt new file mode 100644 index 000000000..4045d5598 --- /dev/null +++ b/changelog/12965.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cockroachdb: add high-availability support +``` diff --git a/physical/cockroachdb/cockroachdb.go b/physical/cockroachdb/cockroachdb.go index 12439d83c..ef57a62e5 100644 --- a/physical/cockroachdb/cockroachdb.go +++ b/physical/cockroachdb/cockroachdb.go @@ -28,18 +28,23 @@ var ( ) const ( - defaultTableName = "vault_kv_store" + defaultTableName = "vault_kv_store" + defaultHATableName = "vault_ha_locks" ) // CockroachDBBackend Backend is a physical backend that stores data // within a CockroachDB database. type CockroachDBBackend struct { - table string - client *sql.DB - rawStatements map[string]string - statements map[string]*sql.Stmt - logger log.Logger - permitPool *physical.PermitPool + table string + haTable string + client *sql.DB + rawStatements map[string]string + statements map[string]*sql.Stmt + rawHAStatements map[string]string + haStatements map[string]*sql.Stmt + logger log.Logger + permitPool *physical.PermitPool + haEnabled bool } // NewCockroachDBBackend constructs a CockroachDB backend using the given @@ -51,6 +56,8 @@ func NewCockroachDBBackend(conf map[string]string, logger log.Logger) (physical. return nil, fmt.Errorf("missing connection_url") } + haEnabled := conf["ha_enabled"] == "true" + dbTable := conf["table"] if dbTable == "" { dbTable = defaultTableName @@ -61,6 +68,16 @@ func NewCockroachDBBackend(conf map[string]string, logger log.Logger) (physical. return nil, fmt.Errorf("invalid table: %w", err) } + dbHATable, ok := conf["ha_table"] + if !ok { + dbHATable = defaultHATableName + } + + err = validateDBTable(dbHATable) + if err != nil { + return nil, fmt.Errorf("invalid HA table: %w", err) + } + maxParStr, ok := conf["max_parallel"] var maxParInt int if ok { @@ -79,17 +96,30 @@ func NewCockroachDBBackend(conf map[string]string, logger log.Logger) (physical. return nil, fmt.Errorf("failed to connect to cockroachdb: %w", err) } - // Create the required table if it doesn't exists. + // Create the required tables if they don't exist. createQuery := "CREATE TABLE IF NOT EXISTS " + dbTable + " (path STRING, value BYTES, PRIMARY KEY (path))" if _, err := db.Exec(createQuery); err != nil { - return nil, fmt.Errorf("failed to create mysql table: %w", err) + return nil, fmt.Errorf("failed to create CockroachDB table: %w", err) + } + if haEnabled { + createHATableQuery := "CREATE TABLE IF NOT EXISTS " + dbHATable + + "(ha_key TEXT NOT NULL, " + + " ha_identity TEXT NOT NULL, " + + " ha_value TEXT, " + + " valid_until TIMESTAMP WITH TIME ZONE NOT NULL, " + + " CONSTRAINT ha_key PRIMARY KEY (ha_key) " + + ");" + if _, err := db.Exec(createHATableQuery); err != nil { + return nil, fmt.Errorf("failed to create CockroachDB HA table: %w", err) + } } // Setup the backend c := &CockroachDBBackend{ - table: dbTable, - client: db, + table: dbTable, + haTable: dbHATable, + client: db, rawStatements: map[string]string{ "put": "INSERT INTO " + dbTable + " VALUES($1, $2)" + " ON CONFLICT (path) DO " + @@ -99,26 +129,45 @@ func NewCockroachDBBackend(conf map[string]string, logger log.Logger) (physical. "list": "SELECT path FROM " + dbTable + " WHERE path LIKE $1", }, statements: make(map[string]*sql.Stmt), - logger: logger, - permitPool: physical.NewPermitPool(maxParInt), + rawHAStatements: map[string]string{ + "get": "SELECT ha_value FROM " + dbHATable + " WHERE NOW() <= valid_until AND ha_key = $1", + "upsert": "INSERT INTO " + dbHATable + " as t (ha_identity, ha_key, ha_value, valid_until)" + + " VALUES ($1, $2, $3, NOW() + $4) " + + " ON CONFLICT (ha_key) DO " + + " UPDATE SET (ha_identity, ha_key, ha_value, valid_until) = ($1, $2, $3, NOW() + $4) " + + " WHERE (t.valid_until < NOW() AND t.ha_key = $2) OR " + + " (t.ha_identity = $1 AND t.ha_key = $2) ", + "delete": "DELETE FROM " + dbHATable + " WHERE ha_key = $1", + }, + haStatements: make(map[string]*sql.Stmt), + logger: logger, + permitPool: physical.NewPermitPool(maxParInt), + haEnabled: haEnabled, } // Prepare all the statements required for name, query := range c.rawStatements { - if err := c.prepare(name, query); err != nil { + if err := c.prepare(c.statements, name, query); err != nil { return nil, err } } + if haEnabled { + for name, query := range c.rawHAStatements { + if err := c.prepare(c.haStatements, name, query); err != nil { + return nil, err + } + } + } return c, nil } -// prepare is a helper to prepare a query for future execution -func (c *CockroachDBBackend) prepare(name, query string) error { +// prepare is a helper to prepare a query for future execution. +func (c *CockroachDBBackend) prepare(statementMap map[string]*sql.Stmt, name, query string) error { stmt, err := c.client.Prepare(query) if err != nil { return fmt.Errorf("failed to prepare %q: %w", name, err) } - c.statements[name] = stmt + statementMap[name] = stmt return nil } diff --git a/physical/cockroachdb/cockroachdb_ha.go b/physical/cockroachdb/cockroachdb_ha.go new file mode 100644 index 000000000..5b77520f5 --- /dev/null +++ b/physical/cockroachdb/cockroachdb_ha.go @@ -0,0 +1,201 @@ +package cockroachdb + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/vault/sdk/physical" +) + +const ( + // The lock TTL matches the default that Consul API uses, 15 seconds. + // Used as part of SQL commands to set/extend lock expiry time relative to + // database clock. + CockroachDBLockTTLSeconds = 15 + + // The amount of time to wait between the lock renewals + CockroachDBLockRenewInterval = 5 * time.Second + + // CockroachDBLockRetryInterval is the amount of time to wait + // if a lock fails before trying again. + CockroachDBLockRetryInterval = time.Second +) + +// Verify backend satisfies the correct interfaces. +var ( + _ physical.HABackend = (*CockroachDBBackend)(nil) + _ physical.Lock = (*CockroachDBLock)(nil) +) + +type CockroachDBLock struct { + backend *CockroachDBBackend + key string + value string + identity string + lock sync.Mutex + + renewTicker *time.Ticker + + // ttlSeconds is how long a lock is valid for. + ttlSeconds int + + // renewInterval is how much time to wait between lock renewals. must be << ttl. + renewInterval time.Duration + + // retryInterval is how much time to wait between attempts to grab the lock. + retryInterval time.Duration +} + +func (c *CockroachDBBackend) HAEnabled() bool { + return c.haEnabled +} + +func (c *CockroachDBBackend) LockWith(key, value string) (physical.Lock, error) { + identity, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + return &CockroachDBLock{ + backend: c, + key: key, + value: value, + identity: identity, + ttlSeconds: CockroachDBLockTTLSeconds, + renewInterval: CockroachDBLockRenewInterval, + retryInterval: CockroachDBLockRetryInterval, + }, nil +} + +// Lock tries to acquire the lock by repeatedly trying to create a record in the +// CockroachDB table. It will block until either the stop channel is closed or +// the lock could be acquired successfully. The returned channel will be closed +// once the lock in the CockroachDB table cannot be renewed, either due to an +// error speaking to CockroachDB or because someone else has taken it. +func (l *CockroachDBLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { + l.lock.Lock() + defer l.lock.Unlock() + + var ( + success = make(chan struct{}) + errors = make(chan error, 1) + leader = make(chan struct{}) + ) + go l.tryToLock(stopCh, success, errors) + + select { + case <-success: + // After acquiring it successfully, we must renew the lock periodically. + l.renewTicker = time.NewTicker(l.renewInterval) + go l.periodicallyRenewLock(leader) + case err := <-errors: + return nil, err + case <-stopCh: + return nil, nil + } + + return leader, nil +} + +// Unlock releases the lock by deleting the lock record from the +// CockroachDB table. +func (l *CockroachDBLock) Unlock() error { + c := l.backend + c.permitPool.Acquire() + defer c.permitPool.Release() + + if l.renewTicker != nil { + l.renewTicker.Stop() + } + + _, err := c.haStatements["delete"].Exec(l.key) + return err +} + +// Value checks whether or not the lock is held by any instance of CockroachDBLock, +// including this one, and returns the current value. +func (l *CockroachDBLock) Value() (bool, string, error) { + c := l.backend + c.permitPool.Acquire() + defer c.permitPool.Release() + var result string + err := c.haStatements["get"].QueryRow(l.key).Scan(&result) + + switch err { + case nil: + return true, result, nil + case sql.ErrNoRows: + return false, "", nil + default: + return false, "", err + + } +} + +// tryToLock tries to create a new item in CockroachDB every `retryInterval`. +// As long as the item cannot be created (because it already exists), it will +// be retried. If the operation fails due to an error, it is sent to the errors +// channel. When the lock could be acquired successfully, the success channel +// is closed. +func (l *CockroachDBLock) tryToLock(stop <-chan struct{}, success chan struct{}, errors chan error) { + ticker := time.NewTicker(l.retryInterval) + defer ticker.Stop() + + for { + select { + case <-stop: + return + case <-ticker.C: + gotlock, err := l.writeItem() + switch { + case err != nil: + // Send to the error channel and don't block if full. + select { + case errors <- err: + default: + } + return + case gotlock: + close(success) + return + } + } + } +} + +func (l *CockroachDBLock) periodicallyRenewLock(done chan struct{}) { + for range l.renewTicker.C { + gotlock, err := l.writeItem() + if err != nil || !gotlock { + close(done) + l.renewTicker.Stop() + return + } + } +} + +// Attempts to put/update the CockroachDB item using condition expressions to +// evaluate the TTL. Returns true if the lock was obtained, false if not. +// If false error may be nil or non-nil: nil indicates simply that someone +// else has the lock, whereas non-nil means that something unexpected happened. +func (l *CockroachDBLock) writeItem() (bool, error) { + c := l.backend + c.permitPool.Acquire() + defer c.permitPool.Release() + + sqlResult, err := c.haStatements["upsert"].Exec(l.identity, l.key, l.value, l.ttlSeconds) + if err != nil { + return false, err + } + if sqlResult == nil { + return false, fmt.Errorf("empty SQL response received") + } + + ar, err := sqlResult.RowsAffected() + if err != nil { + return false, err + } + return ar == 1, nil +} diff --git a/physical/cockroachdb/cockroachdb_test.go b/physical/cockroachdb/cockroachdb_test.go index 399b7fc61..1550b38f5 100644 --- a/physical/cockroachdb/cockroachdb_test.go +++ b/physical/cockroachdb/cockroachdb_test.go @@ -18,7 +18,8 @@ import ( type Config struct { docker.ServiceURL - TableName string + TableName string + HATableName string } var _ docker.ServiceConfig = &Config{} @@ -29,11 +30,11 @@ func prepareCockroachDBTestContainer(t *testing.T) (func(), *Config) { if err != nil { t.Fatal(err) } - tableName := os.Getenv("CR_TABLE") - if tableName == "" { - tableName = defaultTableName + return func() {}, &Config{ + ServiceURL: *s, + TableName: "vault." + defaultTableName, + HATableName: "vault." + defaultHATableName, } - return func() {}, &Config{*s, "vault." + tableName} } runner, err := docker.NewServiceRunner(docker.RunOptions{ @@ -74,14 +75,10 @@ func connectCockroachDB(ctx context.Context, host string, port int) (docker.Serv return nil, err } - tableName := os.Getenv("CR_TABLE") - if tableName == "" { - tableName = defaultTableName - } - return &Config{ - ServiceURL: *docker.NewServiceURL(u), - TableName: database + "." + tableName, + ServiceURL: *docker.NewServiceURL(u), + TableName: database + "." + defaultTableName, + HATableName: database + "." + defaultHATableName, }, nil } @@ -89,26 +86,56 @@ func TestCockroachDBBackend(t *testing.T) { cleanup, config := prepareCockroachDBTestContainer(t) defer cleanup() + hae := os.Getenv("CR_HA_ENABLED") + if hae == "" { + hae = "true" + } + // Run vault tests logger := logging.NewVaultLogger(log.Debug) - b, err := NewCockroachDBBackend(map[string]string{ + b1, err := NewCockroachDBBackend(map[string]string{ "connection_url": config.URL().String(), "table": config.TableName, + "ha_table": config.HATableName, + "ha_enabled": hae, + }, logger) + if err != nil { + t.Fatalf("Failed to create new backend: %v", err) + } + + b2, err := NewCockroachDBBackend(map[string]string{ + "connection_url": config.URL().String(), + "table": config.TableName, + "ha_table": config.HATableName, + "ha_enabled": hae, }, logger) if err != nil { t.Fatalf("Failed to create new backend: %v", err) } defer func() { - truncate(t, b) + truncate(t, b1) + truncate(t, b2) }() - physical.ExerciseBackend(t, b) - truncate(t, b) - physical.ExerciseBackend_ListPrefix(t, b) - truncate(t, b) - physical.ExerciseTransactionalBackend(t, b) + physical.ExerciseBackend(t, b1) + truncate(t, b1) + physical.ExerciseBackend_ListPrefix(t, b1) + truncate(t, b1) + physical.ExerciseTransactionalBackend(t, b1) + truncate(t, b1) + + ha1, ok1 := b1.(physical.HABackend) + ha2, ok2 := b2.(physical.HABackend) + if !ok1 || !ok2 { + t.Fatalf("CockroachDB does not implement HABackend") + } + + if ha1.HAEnabled() && ha2.HAEnabled() { + logger.Info("Running ha backend tests") + physical.ExerciseHABackend(t, ha1, ha2) + } } func truncate(t *testing.T, b physical.Backend) { @@ -117,6 +144,12 @@ func truncate(t *testing.T, b physical.Backend) { if err != nil { t.Fatalf("Failed to drop table: %v", err) } + if crdb.haEnabled { + _, err = crdb.client.Exec("TRUNCATE TABLE " + crdb.haTable) + if err != nil { + t.Fatalf("Failed to drop table: %v", err) + } + } } func TestValidateDBTable(t *testing.T) {