Correctly implement Create-/DropColumnFamilies for PessimisticTransactionDB (#10332)

Summary:
This overrides `CreateColumnFamilies` and `DropColumnFamilies` in `PessimisticTransactionDB` in order to add/remove the created column families to/from the lock manager.

Fixes https://github.com/facebook/rocksdb/issues/10322.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10332

Reviewed By: ajkr

Differential Revision: D37841079

Pulled By: riversand963

fbshipit-source-id: 854d7d9948b0089e0054a8f2875485ba44436fd2
This commit is contained in:
DaPorkchop_ 2022-07-22 08:31:22 -07:00 committed by Facebook GitHub Bot
parent 1e9bf25f61
commit 6bebe65030
3 changed files with 102 additions and 0 deletions

View File

@ -382,6 +382,51 @@ Status PessimisticTransactionDB::CreateColumnFamily(
return s;
}
Status PessimisticTransactionDB::CreateColumnFamilies(
const ColumnFamilyOptions& options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) {
InstrumentedMutexLock l(&column_family_mutex_);
Status s = VerifyCFOptions(options);
if (!s.ok()) {
return s;
}
s = db_->CreateColumnFamilies(options, column_family_names, handles);
if (s.ok()) {
for (auto* handle : *handles) {
lock_manager_->AddColumnFamily(handle);
UpdateCFComparatorMap(handle);
}
}
return s;
}
Status PessimisticTransactionDB::CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) {
InstrumentedMutexLock l(&column_family_mutex_);
for (auto& cf_desc : column_families) {
Status s = VerifyCFOptions(cf_desc.options);
if (!s.ok()) {
return s;
}
}
Status s = db_->CreateColumnFamilies(column_families, handles);
if (s.ok()) {
for (auto* handle : *handles) {
lock_manager_->AddColumnFamily(handle);
UpdateCFComparatorMap(handle);
}
}
return s;
}
// Let LockManager know that it can deallocate the LockMap for this
// column family.
Status PessimisticTransactionDB::DropColumnFamily(
@ -396,6 +441,20 @@ Status PessimisticTransactionDB::DropColumnFamily(
return s;
}
Status PessimisticTransactionDB::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) {
InstrumentedMutexLock l(&column_family_mutex_);
Status s = db_->DropColumnFamilies(column_families);
if (s.ok()) {
for (auto* handle : column_families) {
lock_manager_->RemoveColumnFamily(handle);
}
}
return s;
}
Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
uint32_t cfh_id,
const std::string& key,

View File

@ -101,9 +101,21 @@ class PessimisticTransactionDB : public TransactionDB {
const std::string& column_family_name,
ColumnFamilyHandle** handle) override;
Status CreateColumnFamilies(
const ColumnFamilyOptions& options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) override;
Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) override;
using StackableDB::DropColumnFamily;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) override;
Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key, bool exclusive);
Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,

View File

@ -6499,6 +6499,37 @@ TEST_P(TransactionTest, OpenAndEnableU32Timestamp) {
}
}
TEST_P(TransactionTest, WriteWithBulkCreatedColumnFamilies) {
ColumnFamilyOptions cf_options;
WriteOptions write_options;
std::vector<std::string> cf_names;
std::vector<ColumnFamilyHandle*> cf_handles;
cf_names.push_back("test_cf");
ASSERT_OK(db->CreateColumnFamilies(cf_options, cf_names, &cf_handles));
ASSERT_OK(db->Put(write_options, cf_handles[0], "foo", "bar"));
ASSERT_OK(db->DropColumnFamilies(cf_handles));
for (auto* h : cf_handles) {
delete h;
}
cf_handles.clear();
std::vector<ColumnFamilyDescriptor> cf_descriptors;
cf_descriptors.emplace_back("test_cf", ColumnFamilyOptions());
ASSERT_OK(db->CreateColumnFamilies(cf_options, cf_names, &cf_handles));
ASSERT_OK(db->Put(write_options, cf_handles[0], "foo", "bar"));
ASSERT_OK(db->DropColumnFamilies(cf_handles));
for (auto* h : cf_handles) {
delete h;
}
cf_handles.clear();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {