Create Missing Column Families

Summary: Provide an convenience option to create column families if they are missing from the DB. Task #4460490

Test Plan: added unit test. also, stress test for some time

Reviewers: sdong, haobo, dhruba, ljin, yhchiang

Reviewed By: yhchiang

Subscribers: yhchiang, leveldb

Differential Revision: https://reviews.facebook.net/D18951
This commit is contained in:
Igor Canadi 2014-06-06 18:04:56 -07:00
parent 99d3eed2fd
commit a0191c9dfe
5 changed files with 45 additions and 17 deletions

View File

@ -970,6 +970,16 @@ TEST(ColumnFamilyTest, FlushStaleColumnFamilies) {
Close();
}
TEST(ColumnFamilyTest, CreateMissingColumnFamilies) {
Status s = TryOpen({"one", "two"});
ASSERT_TRUE(!s.ok());
db_options_.create_missing_column_families = true;
s = TryOpen({"default", "one", "two"});
printf("%s\n", s.ToString().c_str());
ASSERT_TRUE(s.ok());
Close();
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -4559,12 +4559,26 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
for (auto cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (cfd == nullptr) {
s = Status::InvalidArgument("Column family not found: ", cf.name);
break;
if (cfd != nullptr) {
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
} else {
if (db_options.create_missing_column_families) {
// missing column family, create it
ColumnFamilyHandle* handle;
impl->mutex_.Unlock();
s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
impl->mutex_.Lock();
if (s.ok()) {
handles->push_back(handle);
} else {
break;
}
} else {
s = Status::InvalidArgument("Column family not found: ", cf.name);
break;
}
}
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
}
}
if (s.ok()) {

View File

@ -595,6 +595,10 @@ struct DBOptions {
// Default: false
bool create_if_missing;
// If true, missing column families will be automatically created.
// Default: false
bool create_missing_column_families;
// If true, an error is raised if the database already exists.
// Default: false
bool error_if_exists;

View File

@ -1677,19 +1677,15 @@ class StressTest {
}
cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
}
while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
std::string name = std::to_string(new_column_family_name_.load());
new_column_family_name_++;
cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
column_family_names_.push_back(name);
}
options_.create_missing_column_families = true;
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
if (s.ok()) {
while (s.ok() &&
column_families_.size() < (size_t)FLAGS_column_families) {
ColumnFamilyHandle* cf = nullptr;
std::string name = std::to_string(new_column_family_name_.load());
new_column_family_name_++;
s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), name, &cf);
column_families_.push_back(cf);
column_family_names_.push_back(name);
}
}
assert(!s.ok() || column_families_.size() ==
static_cast<size_t>(FLAGS_column_families));
} else {

View File

@ -158,6 +158,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
DBOptions::DBOptions()
: create_if_missing(false),
create_missing_column_families(false),
error_if_exists(false),
paranoid_checks(true),
env(Env::Default()),
@ -197,6 +198,7 @@ DBOptions::DBOptions()
DBOptions::DBOptions(const Options& options)
: create_if_missing(options.create_if_missing),
create_missing_column_families(options.create_missing_column_families),
error_if_exists(options.error_if_exists),
paranoid_checks(options.paranoid_checks),
env(options.env),
@ -259,9 +261,11 @@ void DBOptions::Dump(Logger* log) const {
Log(log, " Options.allow_os_buffer: %d", allow_os_buffer);
Log(log, " Options.allow_mmap_reads: %d", allow_mmap_reads);
Log(log, " Options.allow_mmap_writes: %d", allow_mmap_writes);
Log(log, " Options.create_missing_column_families: %d",
create_missing_column_families);
Log(log, " Options.db_log_dir: %s",
db_log_dir.c_str());
Log(log, " Options.wal_dir: %s",
Log(log, " Options.wal_dir: %s",
wal_dir.c_str());
Log(log, " Options.table_cache_numshardbits: %d",
table_cache_numshardbits);