Improve efficiency of create_missing_column_families, light refactor (#11920)

Summary:
In preparing some seqno_to_time_mapping improvements, I found that some of the wrap-up work for creating column families was unnecessarily repeated in the case of DB::Open with create_missing_column_families. This change fixes that (`CreateColumnFamily()` -> `CreateColumnFamilyImpl()` in `DBImpl::Open()`), motivated by avoiding repeated calls to `RegisterRecordSeqnoTimeWorker()` but with the side benefit of avoiding repeated calls to `WriteOptionsFile()` for each CF.

Also in this change:
* Add a `Status::UpdateIfOk()` function for combining statuses in a common pattern
* Rename `max_time_duration` -> `min_preserve_seconds` (include units as much as possible)
* Improved comments in several places

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11920

Test Plan: tests added / updated

Reviewed By: jaykorean

Differential Revision: D49919147

Pulled By: pdillinger

fbshipit-source-id: 3d0318c1d070c842c5331da0a5b415caedc104f1
This commit is contained in:
Peter Dillinger 2023-10-04 14:14:22 -07:00 committed by Facebook GitHub Bot
parent 40b618f234
commit 141b872bd4
11 changed files with 158 additions and 40 deletions

View file

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include <atomic>
#include <string>
#include <thread>
#include <vector>
@ -27,6 +28,7 @@
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/coding.h"
#include "util/defer.h"
#include "util/string_util.h"
#include "utilities/fault_injection_env.h"
#include "utilities/merge_operators.h"
@ -2169,13 +2171,57 @@ TEST_P(ColumnFamilyTest, FlushStaleColumnFamilies) {
Close();
}
namespace {
struct CountOptionsFilesFs : public FileSystemWrapper {
explicit CountOptionsFilesFs(const std::shared_ptr<FileSystem>& t)
: FileSystemWrapper(t) {}
const char* Name() const override { return "CountOptionsFilesFs"; }
IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* r,
IODebugContext* dbg) override {
if (f.find("OPTIONS-") != std::string::npos) {
options_files_created.fetch_add(1, std::memory_order_relaxed);
}
return FileSystemWrapper::NewWritableFile(f, file_opts, r, dbg);
}
std::atomic<int> options_files_created{};
};
} // namespace
TEST_P(ColumnFamilyTest, CreateMissingColumnFamilies) {
Status s = TryOpen({"one", "two"});
ASSERT_TRUE(!s.ok());
db_options_.create_missing_column_families = true;
s = TryOpen({"default", "one", "two"});
ASSERT_TRUE(s.ok());
// Can't accidentally add CFs to an existing DB
Open();
Close();
ASSERT_FALSE(db_options_.create_missing_column_families);
ASSERT_NOK(TryOpen({"one", "two"}));
// Nor accidentally create in a new DB
Destroy();
db_options_.create_if_missing = true;
ASSERT_NOK(TryOpen({"one", "two"}));
// Only with the option (new DB case)
db_options_.create_missing_column_families = true;
// Also setup to count number of options files created (see check below)
auto my_fs =
std::make_shared<CountOptionsFilesFs>(db_options_.env->GetFileSystem());
auto my_env = std::make_unique<CompositeEnvWrapper>(db_options_.env, my_fs);
SaveAndRestore<Env*> save_restore_env(&db_options_.env, my_env.get());
ASSERT_OK(TryOpen({"default", "one", "two"}));
Close();
// An older version would write an updated options file for each column
// family created under create_missing_column_families, which would be
// quadratic I/O in the number of column families.
ASSERT_EQ(my_fs->options_files_created.load(), 1);
// Add to existing DB case
ASSERT_OK(TryOpen({"default", "one", "two", "three", "four"}));
Close();
ASSERT_EQ(my_fs->options_files_created.load(), 2);
}
TEST_P(ColumnFamilyTest, SanitizeOptions) {

View file

@ -816,34 +816,34 @@ Status DBImpl::StartPeriodicTaskScheduler() {
}
Status DBImpl::RegisterRecordSeqnoTimeWorker() {
uint64_t min_time_duration = std::numeric_limits<uint64_t>::max();
uint64_t max_time_duration = std::numeric_limits<uint64_t>::min();
uint64_t min_preserve_seconds = std::numeric_limits<uint64_t>::max();
uint64_t max_preserve_seconds = std::numeric_limits<uint64_t>::min();
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
// preserve time is the max of 2 options.
uint64_t preserve_time_duration =
uint64_t preserve_seconds =
std::max(cfd->ioptions()->preserve_internal_time_seconds,
cfd->ioptions()->preclude_last_level_data_seconds);
if (!cfd->IsDropped() && preserve_time_duration > 0) {
min_time_duration = std::min(preserve_time_duration, min_time_duration);
max_time_duration = std::max(preserve_time_duration, max_time_duration);
if (!cfd->IsDropped() && preserve_seconds > 0) {
min_preserve_seconds = std::min(preserve_seconds, min_preserve_seconds);
max_preserve_seconds = std::max(preserve_seconds, max_preserve_seconds);
}
}
if (min_time_duration == std::numeric_limits<uint64_t>::max()) {
if (min_preserve_seconds == std::numeric_limits<uint64_t>::max()) {
seqno_to_time_mapping_.Resize(0, 0);
} else {
seqno_to_time_mapping_.Resize(min_time_duration, max_time_duration);
seqno_to_time_mapping_.Resize(min_preserve_seconds, max_preserve_seconds);
}
}
uint64_t seqno_time_cadence = 0;
if (min_time_duration != std::numeric_limits<uint64_t>::max()) {
if (min_preserve_seconds != std::numeric_limits<uint64_t>::max()) {
// round up to 1 when the time_duration is smaller than
// kMaxSeqnoTimePairsPerCF
seqno_time_cadence =
(min_time_duration + SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF - 1) /
seqno_time_cadence = (min_preserve_seconds +
SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF - 1) /
SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF;
}
@ -3296,14 +3296,34 @@ void DBImpl::MultiGetEntity(const ReadOptions& _read_options,
statuses, sorted_input);
}
Status DBImpl::WrapUpCreateColumnFamilies(
const std::vector<const ColumnFamilyOptions*>& cf_options) {
// NOTE: this function is skipped for create_missing_column_families and
// DB::Open, so new functionality here might need to go into Open also.
bool register_worker = false;
for (auto* opts_ptr : cf_options) {
if (opts_ptr->preserve_internal_time_seconds > 0 ||
opts_ptr->preclude_last_level_data_seconds > 0) {
register_worker = true;
break;
}
}
// Attempt both follow-up actions even if one fails
Status s = WriteOptionsFile(true /*need_mutex_lock*/,
true /*need_enter_write_thread*/);
if (register_worker) {
s.UpdateIfOk(RegisterRecordSeqnoTimeWorker());
}
return s;
}
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
if (s.ok()) {
s = WriteOptionsFile(true /*need_mutex_lock*/,
true /*need_enter_write_thread*/);
s.UpdateIfOk(WrapUpCreateColumnFamilies({&cf_options}));
}
return s;
}
@ -3327,11 +3347,7 @@ Status DBImpl::CreateColumnFamilies(
success_once = true;
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
s.UpdateIfOk(WrapUpCreateColumnFamilies({&cf_options}));
}
return s;
}
@ -3344,6 +3360,8 @@ Status DBImpl::CreateColumnFamilies(
size_t num_cf = column_families.size();
Status s;
bool success_once = false;
std::vector<const ColumnFamilyOptions*> cf_opts;
cf_opts.reserve(num_cf);
for (size_t i = 0; i < num_cf; i++) {
ColumnFamilyHandle* handle;
s = CreateColumnFamilyImpl(column_families[i].options,
@ -3353,13 +3371,10 @@ Status DBImpl::CreateColumnFamilies(
}
handles->push_back(handle);
success_once = true;
cf_opts.push_back(&column_families[i].options);
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
s.UpdateIfOk(WrapUpCreateColumnFamilies(cf_opts));
}
return s;
}
@ -3447,10 +3462,6 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
}
} // InstrumentedMutexLock l(&mutex_)
if (cf_options.preserve_internal_time_seconds > 0 ||
cf_options.preclude_last_level_data_seconds > 0) {
s = RegisterRecordSeqnoTimeWorker();
}
sv_context.Clean();
// this is outside the mutex
if (s.ok()) {

View file

@ -1823,10 +1823,15 @@ class DBImpl : public DB {
const Status CreateArchivalDirectory();
// Create a column family, without some of the follow-up work yet
Status CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& cf_name,
ColumnFamilyHandle** handle);
// Follow-up work to user creating a column family or (families)
Status WrapUpCreateColumnFamilies(
const std::vector<const ColumnFamilyOptions*>& cf_options);
Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
// Delete any unneeded files and stale in-memory entries.

View file

@ -2069,7 +2069,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
// missing column family, create it
ColumnFamilyHandle* handle = nullptr;
impl->mutex_.Unlock();
s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
// NOTE: the work normally done in WrapUpCreateColumnFamilies will
// be done separately below.
s = impl->CreateColumnFamilyImpl(cf.options, cf.name, &handle);
impl->mutex_.Lock();
if (s.ok()) {
handles->push_back(handle);

View file

@ -42,15 +42,16 @@ class PeriodicTaskScheduler {
PeriodicTaskScheduler& operator=(const PeriodicTaskScheduler&) = delete;
PeriodicTaskScheduler& operator=(PeriodicTaskScheduler&&) = delete;
// Register a task with its default repeat period
// Register a task with its default repeat period. Thread safe call.
Status Register(PeriodicTaskType task_type, const PeriodicTaskFunc& fn);
// Register a task with specified repeat period. 0 is an invalid argument
// (kInvalidPeriodSec). To stop the task, please use Unregister() specifically
// (kInvalidPeriodSec). To stop the task, please use Unregister().
// Thread safe call.
Status Register(PeriodicTaskType task_type, const PeriodicTaskFunc& fn,
uint64_t repeat_period_seconds);
// Unregister the task
// Unregister the task. Thread safe call.
Status Unregister(PeriodicTaskType task_type);
#ifndef NDEBUG
@ -105,4 +106,3 @@ class PeriodicTaskScheduler {
};
} // namespace ROCKSDB_NAMESPACE

View file

@ -362,6 +362,10 @@ class DB {
// Create a column_family and return the handle of column family
// through the argument handle.
// NOTE: creating many column families one-by-one is not recommended because
// of quadratic overheads, such as writing a full OPTIONS file for all CFs
// after each new CF creation. Use CreateColumnFamilies(), or DB::Open() with
// create_missing_column_families=true.
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle);

View file

@ -485,7 +485,8 @@ struct DBOptions {
// Default: false
bool create_if_missing = false;
// If true, missing column families will be automatically created.
// If true, missing column families will be automatically created on
// DB::Open().
// Default: false
bool create_missing_column_families = false;

View file

@ -151,6 +151,25 @@ class Status {
return state_.get();
}
// Override this status with another, unless this status is already non-ok.
// Returns *this. Thus, the result of `a.UpdateIfOk(b).UpdateIfOk(c)` is
// non-ok (and `a` modified as such) iff any input was non-ok, with
// left-most taking precedence as far as the details.
Status& UpdateIfOk(Status&& s) {
if (code() == kOk) {
*this = std::move(s);
} else {
// Alright to ignore that status as long as this one is checked
s.PermitUncheckedError();
}
MustCheck();
return *this;
}
Status& UpdateIfOk(const Status& s) {
return UpdateIfOk(std::forward<Status>(Status(s)));
}
// Return a success status.
static Status OK() { return Status(); }

View file

@ -180,4 +180,3 @@ void LDBTool::Run(int argc, char** argv, Options options,
exit(error_code);
}
} // namespace ROCKSDB_NAMESPACE

View file

@ -0,0 +1 @@
Improved the I/O efficiency of DB::Open a new DB with `create_missing_column_families=true` and many column families.

View file

@ -243,6 +243,36 @@ TEST_F(SmallEnumSetTest, SmallEnumSetTest2) {
}
}
// ***************************************************************** //
// Unit test for Status
TEST(StatusTest, Update) {
const Status ok = Status::OK();
const Status inc = Status::Incomplete("blah");
const Status notf = Status::NotFound("meow");
Status s = ok;
ASSERT_TRUE(s.UpdateIfOk(Status::Corruption("bad")).IsCorruption());
ASSERT_TRUE(s.IsCorruption());
s = ok;
ASSERT_TRUE(s.UpdateIfOk(Status::OK()).ok());
ASSERT_TRUE(s.UpdateIfOk(ok).ok());
ASSERT_TRUE(s.ok());
ASSERT_TRUE(s.UpdateIfOk(inc).IsIncomplete());
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.UpdateIfOk(notf).IsIncomplete());
ASSERT_TRUE(s.UpdateIfOk(ok).IsIncomplete());
ASSERT_TRUE(s.IsIncomplete());
// Keeps left-most non-OK status
s = ok;
ASSERT_TRUE(
s.UpdateIfOk(Status()).UpdateIfOk(notf).UpdateIfOk(inc).IsNotFound());
ASSERT_TRUE(s.IsNotFound());
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {