Merge branch 'master' of github.com:facebook/rocksdb

This commit is contained in:
Gabriela Jacques da Silva 2016-02-02 18:38:16 -08:00
commit 94be872eab
26 changed files with 809 additions and 179 deletions

View file

@ -34,7 +34,7 @@ before_script:
# as EnvPosixTest::AllocateTest expects within the Travis OpenVZ environment.
script:
- if [[ "${TRAVIS_OS_NAME}" == 'linux' ]]; then OPT=-DTRAVIS CLANG_FORMAT_DIFF=/tmp/clang-format-diff.py make format || true; fi
- OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest && make clean && OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 check
- OPT=-DTRAVIS V=1 make -j4 check && OPT=-DTRAVIS V=1 make clean jclean rocksdbjava jtest && make clean && OPT="-DTRAVIS -DROCKSDB_LITE" V=1 make -j4 static_lib
notifications:
email:

View file

@ -700,7 +700,7 @@ clean:
tags:
ctags * -R
cscope -b `find . -name '*.cc'` `find . -name '*.h'`
cscope -b `find . -name '*.cc'` `find . -name '*.h'` `find . -name '*.c'`
format:
build_tools/format-diff.sh

View file

@ -32,8 +32,16 @@ Status AutoRollLogger::ResetLogger() {
}
void AutoRollLogger::RollLogFile() {
std::string old_fname = OldInfoLogFileName(
dbname_, env_->NowMicros(), db_absolute_path_, db_log_dir_);
// This function is called when log is rotating. Two rotations
// can happen quickly (NowMicro returns same value). To not overwrite
// previous log file we increment by one micro second and try again.
uint64_t now = env_->NowMicros();
std::string old_fname;
do {
old_fname = OldInfoLogFileName(
dbname_, now, db_absolute_path_, db_log_dir_);
now++;
} while (env_->FileExists(old_fname).ok());
env_->RenameFile(log_fname_, old_fname);
}

View file

@ -239,6 +239,17 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
result.level0_slowdown_writes_trigger,
result.level0_file_num_compaction_trigger);
}
if (result.soft_pending_compaction_bytes_limit == 0) {
result.soft_pending_compaction_bytes_limit =
result.hard_pending_compaction_bytes_limit;
} else if (result.hard_pending_compaction_bytes_limit > 0 &&
result.soft_pending_compaction_bytes_limit >
result.hard_pending_compaction_bytes_limit) {
result.soft_pending_compaction_bytes_limit =
result.hard_pending_compaction_bytes_limit;
}
if (result.level_compaction_dynamic_level_bytes) {
if (result.compaction_style != kCompactionStyleLevel ||
db_options.db_paths.size() > 1U) {
@ -513,6 +524,21 @@ std::unique_ptr<WriteControllerToken> SetupDelay(
}
return write_controller->GetDelayToken(write_rate);
}
int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
int level0_slowdown_writes_trigger) {
// SanitizeOptions() ensures it.
assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
// 1/4 of the way between L0 compaction trigger threshold and slowdown
// condition.
// Or twice as compaction trigger, if it is smaller.
return std::min(level0_file_num_compaction_trigger * 2,
level0_file_num_compaction_trigger +
(level0_slowdown_writes_trigger -
level0_file_num_compaction_trigger) /
4);
}
} // namespace
void ColumnFamilyData::RecalculateWriteStallConditions(
@ -531,21 +557,6 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number);
} else if (mutable_cf_options.max_write_buffer_number > 3 &&
imm()->NumNotFlushed() >=
mutable_cf_options.max_write_buffer_number - 1) {
write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller,
compaction_needed_bytes, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d immutable memtables "
"(waiting for flush), max_write_buffer_number is set to %d "
"rate %" PRIu64,
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
@ -567,6 +578,21 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stopping writes because of estimated pending compaction "
"bytes %" PRIu64,
name_.c_str(), compaction_needed_bytes);
} else if (mutable_cf_options.max_write_buffer_number > 3 &&
imm()->NumNotFlushed() >=
mutable_cf_options.max_write_buffer_number - 1) {
write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller,
compaction_needed_bytes, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d immutable memtables "
"(waiting for flush), max_write_buffer_number is set to %d "
"rate %" PRIu64,
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) {
@ -598,6 +624,29 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64 " rate %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else if (vstorage->l0_delay_trigger_count() >=
GetL0ThresholdSpeedupCompaction(
mutable_cf_options.level0_file_num_compaction_trigger,
mutable_cf_options.level0_slowdown_writes_trigger)) {
write_controller_token_ = write_controller->GetCompactionPressureToken();
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Increasing compaction threads because we have %d level-0 "
"files ",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
// Increase compaction threads if bytes needed for compaction exceeds
// 1/4 of threshold for slowing down.
// If soft pending compaction byte limit is not set, always speed up
// compaction.
write_controller_token_ = write_controller->GetCompactionPressureToken();
if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Increasing compaction threads because of estimated pending "
"compaction "
"bytes %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes());
}
} else {
write_controller_token_.reset();
}

View file

@ -65,6 +65,7 @@ class ColumnFamilyTest : public testing::Test {
~ColumnFamilyTest() {
Close();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Destroy();
delete env_;
}
@ -2047,7 +2048,6 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
Close();
Destroy();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#ifndef ROCKSDB_LITE
@ -2125,7 +2125,6 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) {
drop_cf_thread.join();
Close();
Destroy();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
for (auto* comparator : comparators) {
if (comparator) {
delete comparator;
@ -2137,6 +2136,9 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) {
TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
const uint64_t kBaseRate = 810000u;
db_options_.delayed_write_rate = kBaseRate;
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
Open({"default"});
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
@ -2162,6 +2164,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(400);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2169,6 +2172,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2224,6 +2228,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(3001);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2248,6 +2253,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(101);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2320,6 +2326,73 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
dbfull()->TEST_write_controler().delayed_write_rate());
}
TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) {
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
Open({"default"});
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
VersionStorageInfo* vstorage = cfd->current()->storage_info();
MutableCFOptions mutable_cf_options(
Options(db_options_, column_family_options_),
ImmutableCFOptions(Options(db_options_, column_family_options_)));
// Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
mutable_cf_options.level0_file_num_compaction_trigger = 4;
mutable_cf_options.level0_slowdown_writes_trigger = 36;
mutable_cf_options.level0_stop_writes_trigger = 50;
// Speedup threshold = 200 / 4 = 50
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
vstorage->TEST_set_estimated_compaction_needed_bytes(40);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(45);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(7);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(9);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(6);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
// Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6
mutable_cf_options.level0_file_num_compaction_trigger = 4;
mutable_cf_options.level0_slowdown_writes_trigger = 16;
mutable_cf_options.level0_stop_writes_trigger = 30;
vstorage->set_l0_delay_trigger_count(5);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(7);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(3);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
}
TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
const uint64_t kBaseRate = 810000u;
db_options_.delayed_write_rate = kBaseRate;
@ -2401,6 +2474,74 @@ TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
}
TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6;
column_family_options_.soft_pending_compaction_bytes_limit = 200;
column_family_options_.hard_pending_compaction_bytes_limit = 2000;
Open();
CreateColumnFamilies({"one"});
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
VersionStorageInfo* vstorage = cfd->current()->storage_info();
ColumnFamilyData* cfd1 =
static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
MutableCFOptions mutable_cf_options(
Options(db_options_, column_family_options_),
ImmutableCFOptions(Options(db_options_, column_family_options_)));
// Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8
mutable_cf_options.level0_file_num_compaction_trigger = 4;
mutable_cf_options.level0_slowdown_writes_trigger = 36;
mutable_cf_options.level0_stop_writes_trigger = 30;
// Speedup threshold = 200 / 4 = 50
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
MutableCFOptions mutable_cf_options1 = mutable_cf_options;
mutable_cf_options1.level0_slowdown_writes_trigger = 16;
vstorage->TEST_set_estimated_compaction_needed_bytes(40);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(60);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->TEST_set_estimated_compaction_needed_bytes(30);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->TEST_set_estimated_compaction_needed_bytes(70);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->TEST_set_estimated_compaction_needed_bytes(20);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->TEST_set_estimated_compaction_needed_bytes(3);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(9);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage1->set_l0_delay_trigger_count(2);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(6, dbfull()->BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(0);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
}
} // namespace rocksdb
int main(int argc, char** argv) {

View file

@ -533,6 +533,104 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
}
TEST_F(DBCompactionTest, BGCompactionsAllowed) {
// Create several column families. Make compaction triggers in all of them
// and see number of compactions scheduled to be less than allowed.
const int kNumKeysPerFile = 100;
Options options;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 << 10;
options.num_levels = 3;
// Should speed up compaction when there are 4 files.
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 20;
options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large
options.base_background_compactions = 1;
options.max_background_compactions = 3;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
options = CurrentOptions(options);
// Block all threads in thread pool.
const size_t kTotalTasks = 4;
env_->SetBackgroundThreads(4, Env::LOW);
test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
for (size_t i = 0; i < kTotalTasks; i++) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_tasks[i], Env::Priority::LOW);
sleeping_tasks[i].WaitUntilSleeping();
}
CreateAndReopenWithCF({"one", "two", "three"}, options);
Random rnd(301);
for (int cf = 0; cf < 4; cf++) {
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < kNumKeysPerFile; i++) {
ASSERT_OK(Put(cf, Key(i), ""));
}
// put extra key to trigger flush
ASSERT_OK(Put(cf, "", ""));
dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
}
}
// Now all column families qualify compaction but only one should be
// scheduled, because no column family hits speed up condition.
ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
// Create two more files for one column family, which triggers speed up
// condition, three compactions will be scheduled.
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < kNumKeysPerFile; i++) {
ASSERT_OK(Put(2, Key(i), ""));
}
// put extra key to trigger flush
ASSERT_OK(Put(2, "", ""));
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
NumTableFilesAtLevel(0, 2));
}
ASSERT_EQ(3, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
// Unblock all threads to unblock all compactions.
for (size_t i = 0; i < kTotalTasks; i++) {
sleeping_tasks[i].WakeUp();
sleeping_tasks[i].WaitUntilDone();
}
dbfull()->TEST_WaitForCompact();
// Verify number of compactions allowed will come back to 1.
for (size_t i = 0; i < kTotalTasks; i++) {
sleeping_tasks[i].Reset();
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_tasks[i], Env::Priority::LOW);
sleeping_tasks[i].WaitUntilSleeping();
}
for (int cf = 0; cf < 4; cf++) {
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < kNumKeysPerFile; i++) {
ASSERT_OK(Put(cf, Key(i), ""));
}
// put extra key to trigger flush
ASSERT_OK(Put(cf, "", ""));
dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
}
}
// Now all column families qualify compaction but only one should be
// scheduled, because no column family hits speed up condition.
ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
for (size_t i = 0; i < kTotalTasks; i++) {
sleeping_tasks[i].WakeUp();
sleeping_tasks[i].WaitUntilDone();
}
}
TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
Options options;
options.write_buffer_size = 100000000; // Large write buffer
@ -2198,6 +2296,25 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
Destroy(options);
}
TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
Options options = CurrentOptions();
options.max_background_compactions = 5;
options.soft_pending_compaction_bytes_limit = 0;
options.hard_pending_compaction_bytes_limit = 100;
options.create_if_missing = true;
DestroyAndReopen(options);
ASSERT_EQ(5, db_->GetOptions().base_background_compactions);
ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
options.base_background_compactions = 4;
options.max_background_compactions = 3;
options.soft_pending_compaction_bytes_limit = 200;
options.hard_pending_compaction_bytes_limit = 150;
DestroyAndReopen(options);
ASSERT_EQ(3, db_->GetOptions().base_background_compactions);
ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
}
// This tests for a bug that could cause two level0 compactions running
// concurrently
// TODO(aekmekji): Make sure that the reason this fails when run with

View file

@ -146,6 +146,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.info_log = nullptr;
}
}
if (result.base_background_compactions == -1) {
result.base_background_compactions = result.max_background_compactions;
}
if (result.base_background_compactions > result.max_background_compactions) {
result.base_background_compactions = result.max_background_compactions;
}
result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions,
Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes,
@ -2448,12 +2454,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
auto bg_compactions_allowed = BGCompactionsAllowed();
// special case -- if max_background_flushes == 0, then schedule flush on a
// compaction thread
if (db_options_.max_background_flushes == 0) {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
db_options_.max_background_compactions) {
bg_compactions_allowed) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
@ -2466,7 +2474,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
return;
}
while (bg_compaction_scheduled_ < db_options_.max_background_compactions &&
while (bg_compaction_scheduled_ < bg_compactions_allowed &&
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
@ -2478,6 +2486,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
}
}
int DBImpl::BGCompactionsAllowed() const {
if (write_controller_.NeedSpeedupCompaction()) {
return db_options_.max_background_compactions;
} else {
return db_options_.base_background_compactions;
}
}
void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
assert(!cfd->pending_compaction());
cfd->Ref();
@ -2590,10 +2606,10 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogToBuffer(
log_buffer,
"Calling FlushMemTableToOutputFile with column "
"family [%s], flush slots available %d, compaction slots available %d",
cfd->GetName().c_str(),
db_options_.max_background_flushes - bg_flush_scheduled_,
db_options_.max_background_compactions - bg_compaction_scheduled_);
"family [%s], flush slots available %d, compaction slots allowed %d, "
"compaction slots scheduled %d",
cfd->GetName().c_str(), db_options_.max_background_flushes,
bg_flush_scheduled_, BGCompactionsAllowed() - bg_compaction_scheduled_);
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, log_buffer);
if (cfd->Unref()) {
@ -3311,6 +3327,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, NUMBER_KEYS_READ);
RecordTick(stats_, BYTES_READ, value->size());
MeasureTime(stats_, BYTES_PER_READ, value->size());
}
return s;
}
@ -3421,6 +3438,7 @@ std::vector<Status> DBImpl::MultiGet(
RecordTick(stats_, NUMBER_MULTIGET_CALLS);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read);
PERF_TIMER_STOP(get_post_process_time);
return stat_list;
@ -4119,7 +4137,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (write_thread_.CompleteParallelWorker(&w)) {
// we're responsible for early exit
auto last_sequence = w.parallel_group->last_writer->sequence;
auto last_sequence =
w.parallel_group->last_writer->sequence +
WriteBatchInternal::Count(w.parallel_group->last_writer->batch) - 1;
SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
versions_->SetLastSequence(last_sequence);
write_thread_.EarlyExitParallelGroup(&w);
@ -4305,6 +4325,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// Record statistics
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (write_options.disableWAL) {
@ -4418,7 +4439,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
this, true /*dont_filter_deletes*/,
true /*concurrent_memtable_writes*/);
assert(last_writer->sequence == last_sequence);
assert(last_writer->sequence +
WriteBatchInternal::Count(last_writer->batch) - 1 ==
last_sequence);
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
exit_completed_early = !write_thread_.CompleteParallelWorker(&w);

View file

@ -347,6 +347,10 @@ class DBImpl : public DB {
#endif // NDEBUG
// Return maximum background compaction alowed to be scheduled based on
// compaction status.
int BGCompactionsAllowed() const;
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than

View file

@ -606,6 +606,61 @@ TEST_F(DBTest, EmptyFlush) {
kSkipUniversalCompaction | kSkipMergePut));
}
// Disable because not all platform can run it.
// It requires more than 9GB memory to run it, With single allocation
// of more than 3GB.
TEST_F(DBTest, DISABLED_VeryLargeValue) {
const size_t kValueSize = 3221225472u; // 3GB value
const size_t kKeySize = 8388608u; // 8MB key
std::string raw(kValueSize, 'v');
std::string key1(kKeySize, 'c');
std::string key2(kKeySize, 'd');
Options options;
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
options.paranoid_checks = true;
options = CurrentOptions(options);
DestroyAndReopen(options);
ASSERT_OK(Put("boo", "v1"));
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put(key1, raw));
raw[0] = 'w';
ASSERT_OK(Put(key2, raw));
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(1, NumTableFilesAtLevel(0));
std::string value;
Status s = db_->Get(ReadOptions(), key1, &value);
ASSERT_OK(s);
ASSERT_EQ(kValueSize, value.size());
ASSERT_EQ('v', value[0]);
s = db_->Get(ReadOptions(), key2, &value);
ASSERT_OK(s);
ASSERT_EQ(kValueSize, value.size());
ASSERT_EQ('w', value[0]);
// Compact all files.
Flush();
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Check DB is not in read-only state.
ASSERT_OK(Put("boo", "v1"));
s = db_->Get(ReadOptions(), key1, &value);
ASSERT_OK(s);
ASSERT_EQ(kValueSize, value.size());
ASSERT_EQ('v', value[0]);
s = db_->Get(ReadOptions(), key2, &value);
ASSERT_OK(s);
ASSERT_EQ(kValueSize, value.size());
ASSERT_EQ('w', value[0]);
}
TEST_F(DBTest, GetFromImmutableLayer) {
do {
Options options;

View file

@ -85,7 +85,8 @@ bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) {
option_config == kHashCuckoo || option_config == kUniversalCompaction ||
option_config == kUniversalCompactionMultiLevel ||
option_config == kUniversalSubcompactions ||
option_config == kFIFOCompaction) {
option_config == kFIFOCompaction ||
option_config == kConcurrentSkipList) {
return true;
}
#endif
@ -361,6 +362,11 @@ Options DBTestBase::CurrentOptions(
options.max_subcompactions = 4;
break;
}
case kConcurrentSkipList: {
options.allow_concurrent_memtable_write = true;
options.enable_write_thread_adaptive_yield = true;
break;
}
default:
break;

View file

@ -525,9 +525,10 @@ class DBTestBase : public testing::Test {
kOptimizeFiltersForHits = 27,
kRowCache = 28,
kRecycleLogFiles = 29,
kLevelSubcompactions = 30,
kUniversalSubcompactions = 31,
kEnd = 30
kEnd = 30,
kConcurrentSkipList = 30,
kLevelSubcompactions = 31,
kUniversalSubcompactions = 32,
};
int option_config_;

View file

@ -187,14 +187,16 @@ TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
Put("", "");
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
Put(Key(num * 10), "val");
if (num) {
dbfull()->TEST_WaitForFlushMemTable();
}
Put(Key(30 + num * 10), "val");
Put(Key(60 + num * 10), "val");
dbfull()->TEST_WaitForFlushMemTable();
}
Put("", "");
dbfull()->TEST_WaitForFlushMemTable();
// Query set of non existing keys
for (int i = 5; i < 90; i += 10) {
@ -205,6 +207,13 @@ TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) {
ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
// Make sure bloom filter is used for all but the last L0 file when looking
// up a non-existent key that's in the range of all L0 files.
ASSERT_EQ(Get(Key(35)), "NOT_FOUND");
ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1,
TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
// Unblock compaction and wait it for happening.
sleeping_task_low.WakeUp();
dbfull()->TEST_WaitForCompact();

View file

@ -589,6 +589,19 @@ TEST_F(PerfContextTest, FalseDBMutexWait) {
}
}
}
TEST_F(PerfContextTest, ToString) {
perf_context.Reset();
perf_context.block_read_count = 12345;
std::string zero_included = perf_context.ToString();
ASSERT_NE(std::string::npos, zero_included.find("= 0"));
ASSERT_NE(std::string::npos, zero_included.find("= 12345"));
std::string zero_excluded = perf_context.ToString(true);
ASSERT_EQ(std::string::npos, zero_excluded.find("= 0"));
ASSERT_NE(std::string::npos, zero_excluded.find("= 12345"));
}
}
int main(int argc, char** argv) {

View file

@ -84,15 +84,11 @@ int FindFileInRange(const InternalKeyComparator& icmp,
// are MergeInProgress).
class FilePicker {
public:
FilePicker(
std::vector<FileMetaData*>* files,
const Slice& user_key,
const Slice& ikey,
autovector<LevelFilesBrief>* file_levels,
unsigned int num_levels,
FileIndexer* file_indexer,
const Comparator* user_comparator,
const InternalKeyComparator* internal_comparator)
FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
unsigned int num_levels, FileIndexer* file_indexer,
const Comparator* user_comparator,
const InternalKeyComparator* internal_comparator)
: num_levels_(num_levels),
curr_level_(-1),
hit_file_level_(-1),
@ -102,6 +98,7 @@ class FilePicker {
files_(files),
#endif
level_files_brief_(file_levels),
is_hit_file_last_in_level_(false),
user_key_(user_key),
ikey_(ikey),
file_indexer_(file_indexer),
@ -126,6 +123,8 @@ class FilePicker {
// Loops over all files in current level.
FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
hit_file_level_ = curr_level_;
is_hit_file_last_in_level_ =
curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
int cmp_largest = -1;
// Do key range filtering of files or/and fractional cascading if:
@ -209,6 +208,10 @@ class FilePicker {
// for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
unsigned int GetHitFileLevel() { return hit_file_level_; }
// Returns true if the most recent "hit file" (i.e., one returned by
// GetNextFile()) is at the last index in its level.
bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
private:
unsigned int num_levels_;
unsigned int curr_level_;
@ -220,6 +223,7 @@ class FilePicker {
#endif
autovector<LevelFilesBrief>* level_files_brief_;
bool search_ended_;
bool is_hit_file_last_in_level_;
LevelFilesBrief* curr_file_level_;
unsigned int curr_index_in_curr_level_;
unsigned int start_index_in_curr_level_;
@ -903,7 +907,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
*status = table_cache_->Get(
read_options, *internal_comparator(), f->fd, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel())));
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()));
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
@ -960,10 +965,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
}
}
bool Version::IsFilterSkipped(int level) {
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
// Reaching the bottom level implies misses at all upper levels, so we'll
// skip checking the filters when we predict a hit.
return cfd_->ioptions()->optimize_filters_for_hits && level > 0 &&
return cfd_->ioptions()->optimize_filters_for_hits &&
(level > 0 || is_file_last_in_level) &&
level == storage_info_.num_non_empty_levels() - 1;
}

View file

@ -530,7 +530,7 @@ class Version {
// checked during read operations. In certain cases (trivial move or preload),
// the filter block may already be cached, but we still do not access it such
// that it eventually expires from the cache.
bool IsFilterSkipped(int level);
bool IsFilterSkipped(int level, bool is_file_last_in_level = false);
// The helper function of UpdateAccumulatedStats, which may fill the missing
// fields of file_mata from its associated TableProperties.

View file

@ -308,6 +308,120 @@ TEST_F(WriteBatchTest, Blob) {
handler.seen);
}
// It requires more than 30GB of memory to run the test. With single memory
// allocation of more than 30GB.
// Not all platform can run it. Also it runs a long time. So disable it.
TEST_F(WriteBatchTest, DISABLED_ManyUpdates) {
// Insert key and value of 3GB and push total batch size to 12GB.
static const size_t kKeyValueSize = 4u;
static const uint32_t kNumUpdates = 3 << 30;
std::string raw(kKeyValueSize, 'A');
WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u);
char c = 'A';
for (uint32_t i = 0; i < kNumUpdates; i++) {
if (c > 'Z') {
c = 'A';
}
raw[0] = c;
raw[raw.length() - 1] = c;
c++;
batch.Put(raw, raw);
}
ASSERT_EQ(kNumUpdates, batch.Count());
struct NoopHandler : public WriteBatch::Handler {
uint32_t num_seen = 0;
char expected_char = 'A';
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
EXPECT_EQ(kKeyValueSize, key.size());
EXPECT_EQ(kKeyValueSize, value.size());
EXPECT_EQ(expected_char, key[0]);
EXPECT_EQ(expected_char, value[0]);
EXPECT_EQ(expected_char, key[kKeyValueSize - 1]);
EXPECT_EQ(expected_char, value[kKeyValueSize - 1]);
expected_char++;
if (expected_char > 'Z') {
expected_char = 'A';
}
++num_seen;
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
EXPECT_TRUE(false);
return Status::OK();
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
EXPECT_TRUE(false);
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
EXPECT_TRUE(false);
return Status::OK();
}
virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); }
virtual bool Continue() override { return num_seen < kNumUpdates; }
} handler;
batch.Iterate(&handler);
ASSERT_EQ(kNumUpdates, handler.num_seen);
}
// The test requires more than 18GB memory to run it, with single memory
// allocation of more than 12GB. Not all the platform can run it. So disable it.
TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) {
// Insert key and value of 3GB and push total batch size to 12GB.
static const size_t kKeyValueSize = 3221225472u;
std::string raw(kKeyValueSize, 'A');
WriteBatch batch(12884901888u + 1024u);
for (char i = 0; i < 2; i++) {
raw[0] = 'A' + i;
raw[raw.length() - 1] = 'A' - i;
batch.Put(raw, raw);
}
ASSERT_EQ(2, batch.Count());
struct NoopHandler : public WriteBatch::Handler {
int num_seen = 0;
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
EXPECT_EQ(kKeyValueSize, key.size());
EXPECT_EQ(kKeyValueSize, value.size());
EXPECT_EQ('A' + num_seen, key[0]);
EXPECT_EQ('A' + num_seen, value[0]);
EXPECT_EQ('A' - num_seen, key[kKeyValueSize - 1]);
EXPECT_EQ('A' - num_seen, value[kKeyValueSize - 1]);
++num_seen;
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
EXPECT_TRUE(false);
return Status::OK();
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
EXPECT_TRUE(false);
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
EXPECT_TRUE(false);
return Status::OK();
}
virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); }
virtual bool Continue() override { return num_seen < 2; }
} handler;
batch.Iterate(&handler);
ASSERT_EQ(2, handler.num_seen);
}
TEST_F(WriteBatchTest, Continue) {
WriteBatch batch;

View file

@ -26,6 +26,13 @@ std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
}
std::unique_ptr<WriteControllerToken>
WriteController::GetCompactionPressureToken() {
++total_compaction_pressure_;
return std::unique_ptr<WriteControllerToken>(
new CompactionPressureToken(this));
}
bool WriteController::IsStopped() const { return total_stopped_ > 0; }
// This is inside DB mutex, so we can't sleep and need to minimize
// frequency to get time.
@ -106,4 +113,9 @@ DelayWriteToken::~DelayWriteToken() {
assert(controller_->total_delayed_ >= 0);
}
CompactionPressureToken::~CompactionPressureToken() {
controller_->total_compaction_pressure_--;
assert(controller_->total_compaction_pressure_ >= 0);
}
} // namespace rocksdb

View file

@ -23,6 +23,7 @@ class WriteController {
explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u)
: total_stopped_(0),
total_delayed_(0),
total_compaction_pressure_(0),
bytes_left_(0),
last_refill_time_(0) {
set_delayed_write_rate(_delayed_write_rate);
@ -38,10 +39,16 @@ class WriteController {
// which returns number of microseconds to sleep.
std::unique_ptr<WriteControllerToken> GetDelayToken(
uint64_t delayed_write_rate);
// When an actor (column family) requests a moderate token, compaction
// threads will be increased
std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
// these two metods are querying the state of the WriteController
// these three metods are querying the state of the WriteController
bool IsStopped() const;
bool NeedsDelay() const { return total_delayed_ > 0; }
bool NeedSpeedupCompaction() const {
return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
}
// return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB.
// Prerequisite: DB mutex held.
@ -59,9 +66,11 @@ class WriteController {
friend class WriteControllerToken;
friend class StopWriteToken;
friend class DelayWriteToken;
friend class CompactionPressureToken;
int total_stopped_;
int total_delayed_;
int total_compaction_pressure_;
uint64_t bytes_left_;
uint64_t last_refill_time_;
uint64_t delayed_write_rate_;
@ -96,4 +105,11 @@ class DelayWriteToken : public WriteControllerToken {
virtual ~DelayWriteToken();
};
class CompactionPressureToken : public WriteControllerToken {
public:
explicit CompactionPressureToken(WriteController* controller)
: WriteControllerToken(controller) {}
virtual ~CompactionPressureToken();
};
} // namespace rocksdb

View file

@ -933,8 +933,19 @@ struct DBOptions {
// regardless of this setting
uint64_t delete_obsolete_files_period_micros;
// Suggested number of concurrent background compaction jobs, submitted to
// the default LOW priority thread pool.
//
// Default: max_background_compactions
int base_background_compactions;
// Maximum number of concurrent background compaction jobs, submitted to
// the default LOW priority thread pool.
// We first try to schedule compactions based on
// `base_background_compactions`. If the compaction cannot catch up , we
// will increase number of compaction threads up to
// `max_background_compactions`.
//
// If you're increasing this, also consider increasing number of threads in
// LOW priority thread pool. For more information, see
// Env::SetBackgroundThreads
@ -1110,6 +1121,9 @@ struct DBOptions {
// This option is currently honored only on Windows
//
// Default: 1 Mb
//
// Special value: 0 - means do not maintain per instance buffer. Allocate
// per request buffer and avoid locking.
size_t random_access_max_buffer_size;
// This is the maximum buffer size that is used by WritableFileWriter.

View file

@ -21,7 +21,7 @@ struct PerfContext {
void Reset(); // reset all performance counters to zero
std::string ToString() const;
std::string ToString(bool exclude_zero_counters = false) const;
uint64_t user_key_comparison_count; // total number of user key comparisons
uint64_t block_cache_hit_count; // total number of block cache hits

View file

@ -280,6 +280,10 @@ enum Histograms : uint32_t {
SST_READ_MICROS,
// The number of subcompactions actually scheduled during a compaction
NUM_SUBCOMPACTIONS_SCHEDULED,
// Value size distribution in each operation
BYTES_PER_READ,
BYTES_PER_WRITE,
BYTES_PER_MULTIGET,
HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match
};
@ -307,6 +311,9 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{WRITE_STALL, "rocksdb.db.write.stall"},
{SST_READ_MICROS, "rocksdb.sst.read.micros"},
{NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"},
{BYTES_PER_READ, "rocksdb.bytes.per.read"},
{BYTES_PER_WRITE, "rocksdb.bytes.per.write"},
{BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"},
};
struct HistogramData {

View file

@ -766,6 +766,18 @@ class WinRandomAccessFile : public RandomAccessFile {
return read;
}
void CalculateReadParameters(uint64_t offset, size_t bytes_requested,
size_t& actual_bytes_toread,
uint64_t& first_page_start) const {
const size_t alignment = buffer_.Alignment();
first_page_start = TruncateToPageBoundary(alignment, offset);
const uint64_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
actual_bytes_toread = (last_page_start - first_page_start) + alignment;
}
public:
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options)
@ -797,66 +809,87 @@ class WinRandomAccessFile : public RandomAccessFile {
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
Status s;
SSIZE_T r = -1;
size_t left = n;
char* dest = scratch;
if (n == 0) {
*result = Slice(scratch, 0);
return s;
}
// When in unbuffered mode we need to do the following changes:
// - use our own aligned buffer
// - always read at the offset of that is a multiple of alignment
if (!use_os_buffer_) {
std::unique_lock<std::mutex> lock(buffer_mut_);
// Let's see if at least some of the requested data is already
// in the buffer
if (offset >= buffered_start_ &&
uint64_t first_page_start = 0;
size_t actual_bytes_toread = 0;
size_t bytes_requested = left;
if (!read_ahead_ && random_access_max_buffer_size_ == 0) {
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread,
first_page_start);
assert(actual_bytes_toread > 0);
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {
std::unique_lock<std::mutex> lock(buffer_mut_);
// Let's see if at least some of the requested data is already
// in the buffer
if (offset >= buffered_start_ &&
offset < (buffered_start_ + buffer_.CurrentSize())) {
size_t buffer_offset = offset - buffered_start_;
r = buffer_.Read(dest, buffer_offset, left);
assert(r >= 0);
size_t buffer_offset = offset - buffered_start_;
r = buffer_.Read(dest, buffer_offset, left);
assert(r >= 0);
left -= size_t(r);
offset += r;
dest += r;
}
// Still some left or none was buffered
if (left > 0) {
// Figure out the start/end offset for reading and amount to read
const size_t alignment = buffer_.Alignment();
const size_t first_page_start =
TruncateToPageBoundary(alignment, offset);
size_t bytes_requested = left;
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
bytes_requested = compaction_readahead_size_;
left -= size_t(r);
offset += r;
dest += r;
}
const size_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
const size_t actual_bytes_toread =
(last_page_start - first_page_start) + alignment;
// Still some left or none was buffered
if (left > 0) {
// Figure out the start/end offset for reading and amount to read
bytes_requested = left;
if (buffer_.Capacity() < actual_bytes_toread) {
// If we are in read-ahead mode or the requested size
// exceeds max buffer size then use one-shot
// big buffer otherwise reallocate main buffer
if (read_ahead_ ||
(actual_bytes_toread > random_access_max_buffer_size_)) {
// Unlock the mutex since we are not using instance buffer
lock.unlock();
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
} else {
buffer_.AllocateNewBuffer(actual_bytes_toread);
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
if (read_ahead_ && bytes_requested < compaction_readahead_size_) {
bytes_requested = compaction_readahead_size_;
}
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread,
first_page_start);
assert(actual_bytes_toread > 0);
if (buffer_.Capacity() < actual_bytes_toread) {
// If we are in read-ahead mode or the requested size
// exceeds max buffer size then use one-shot
// big buffer otherwise reallocate main buffer
if (read_ahead_ ||
(actual_bytes_toread > random_access_max_buffer_size_)) {
// Unlock the mutex since we are not using instance buffer
lock.unlock();
r = ReadIntoOneShotBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
else {
buffer_.AllocateNewBuffer(actual_bytes_toread);
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
}
else {
buffer_.Clear();
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
} else {
buffer_.Clear();
r = ReadIntoInstanceBuffer(offset, first_page_start,
actual_bytes_toread, left, dest);
}
}
} else {
@ -1105,6 +1138,8 @@ void WinthreadCall(const char* label, std::error_code result) {
}
}
typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME);
class WinEnv : public Env {
public:
WinEnv();
@ -1643,25 +1678,29 @@ class WinEnv : public Env {
}
virtual uint64_t NowMicros() override {
// all std::chrono clocks on windows proved to return
// values that may repeat that is not good enough for some uses.
const int64_t c_UnixEpochStartTicks = 116444736000000000i64;
const int64_t c_FtToMicroSec = 10;
if (GetSystemTimePreciseAsFileTime_ != NULL) {
// all std::chrono clocks on windows proved to return
// values that may repeat that is not good enough for some uses.
const int64_t c_UnixEpochStartTicks = 116444736000000000i64;
const int64_t c_FtToMicroSec = 10;
// This interface needs to return system time and not
// just any microseconds because it is often used as an argument
// to TimedWait() on condition variable
FILETIME ftSystemTime;
GetSystemTimePreciseAsFileTime(&ftSystemTime);
// This interface needs to return system time and not
// just any microseconds because it is often used as an argument
// to TimedWait() on condition variable
FILETIME ftSystemTime;
GetSystemTimePreciseAsFileTime_(&ftSystemTime);
LARGE_INTEGER li;
li.LowPart = ftSystemTime.dwLowDateTime;
li.HighPart = ftSystemTime.dwHighDateTime;
// Subtract unix epoch start
li.QuadPart -= c_UnixEpochStartTicks;
// Convert to microsecs
li.QuadPart /= c_FtToMicroSec;
return li.QuadPart;
LARGE_INTEGER li;
li.LowPart = ftSystemTime.dwLowDateTime;
li.HighPart = ftSystemTime.dwHighDateTime;
// Subtract unix epoch start
li.QuadPart -= c_UnixEpochStartTicks;
// Convert to microsecs
li.QuadPart /= c_FtToMicroSec;
return li.QuadPart;
}
using namespace std::chrono;
return duration_cast<microseconds>(system_clock::now().time_since_epoch()).count();
}
virtual uint64_t NowNanos() override {
@ -2071,6 +2110,7 @@ class WinEnv : public Env {
std::vector<ThreadPool> thread_pools_;
mutable std::mutex mu_;
std::vector<std::thread> threads_to_join_;
FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_;
};
WinEnv::WinEnv()
@ -2079,7 +2119,15 @@ WinEnv::WinEnv()
page_size_(4 * 1012),
allocation_granularity_(page_size_),
perf_counter_frequency_(0),
thread_pools_(Priority::TOTAL) {
thread_pools_(Priority::TOTAL),
GetSystemTimePreciseAsFileTime_(NULL) {
HMODULE module = GetModuleHandle("kernel32.dll");
if (module != NULL) {
GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress(
module, "GetSystemTimePreciseAsFileTime");
}
SYSTEM_INFO sinfo;
GetSystemInfo(&sinfo);

View file

@ -229,6 +229,7 @@ DBOptions::DBOptions()
db_log_dir(""),
wal_dir(""),
delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000),
base_background_compactions(-1),
max_background_compactions(1),
max_subcompactions(1),
max_background_flushes(1),
@ -295,6 +296,7 @@ DBOptions::DBOptions(const Options& options)
wal_dir(options.wal_dir),
delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros),
base_background_compactions(options.base_background_compactions),
max_background_compactions(options.max_background_compactions),
max_subcompactions(options.max_subcompactions),
max_background_flushes(options.max_background_flushes),
@ -383,6 +385,8 @@ void DBOptions::Dump(Logger* log) const {
table_cache_numshardbits);
Header(log, " Options.delete_obsolete_files_period_micros: %" PRIu64,
delete_obsolete_files_period_micros);
Header(log, " Options.base_background_compactions: %d",
base_background_compactions);
Header(log, " Options.max_background_compactions: %d",
max_background_compactions);
Header(log, " Options.max_subcompactions: %" PRIu32,
@ -652,6 +656,7 @@ Options::PrepareForBulkLoad()
// to L1. This is helpful so that all files that are
// input to the manual compaction are all at L0.
max_background_compactions = 2;
base_background_compactions = 2;
// The compaction would create large files in L1.
target_file_size_base = 256 * 1024 * 1024;

View file

@ -208,7 +208,7 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{offsetof(struct DBOptions, random_access_max_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal}},
{"writable_file_max_buffer_size",
{offsetof(struct DBOptions, writable_file_max_buffer_size),
{offsetof(struct DBOptions, writable_file_max_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal}},
{"use_adaptive_mutex",
{offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean,
@ -219,6 +219,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"max_background_compactions",
{offsetof(struct DBOptions, max_background_compactions), OptionType::kInt,
OptionVerificationType::kNormal}},
{"base_background_compactions",
{offsetof(struct DBOptions, base_background_compactions), OptionType::kInt,
OptionVerificationType::kNormal}},
{"max_background_flushes",
{offsetof(struct DBOptions, max_background_flushes), OptionType::kInt,
OptionVerificationType::kNormal}},

View file

@ -1669,50 +1669,7 @@ TEST_F(OptionsParserTest, DBOptionsAllFieldsSettable) {
"table_cache_numshardbits=28;"
"max_open_files=72;"
"max_file_opening_threads=35;"
"max_background_compactions=33;"
"use_fsync=true;"
"use_adaptive_mutex=false;"
"max_total_wal_size=4295005604;"
"compaction_readahead_size=0;"
"new_table_reader_for_compaction_inputs=false;"
"keep_log_file_num=4890;"
"skip_stats_update_on_db_open=false;"
"max_manifest_file_size=4295009941;"
"db_log_dir=path/to/db_log_dir;"
"skip_log_error_on_recovery=true;"
"writable_file_max_buffer_size=1048576;"
"paranoid_checks=true;"
"is_fd_close_on_exec=false;"
"bytes_per_sync=4295013613;"
"enable_thread_tracking=false;"
"disable_data_sync=false;"
"recycle_log_file_num=0;"
"disableDataSync=false;"
"create_missing_column_families=true;"
"log_file_time_to_roll=3097;"
"max_background_flushes=35;"
"create_if_missing=false;"
"error_if_exists=true;"
"allow_os_buffer=false;"
"delayed_write_rate=4294976214;"
"manifest_preallocation_size=1222;"
"allow_mmap_writes=false;"
"stats_dump_period_sec=70127;"
"allow_fallocate=true;"
"allow_mmap_reads=false;"
"max_log_file_size=4607;"
"random_access_max_buffer_size=1048576;"
"advise_random_on_open=true;"
"wal_bytes_per_sync=4295048118;"
"delete_obsolete_files_period_micros=4294967758;"
"WAL_ttl_seconds=4295008036;"
"WAL_size_limit_MB=4295036161;"
"wal_dir=path/to/wal_dir;"
"db_write_buffer_size=2587;"
"max_subcompactions=64330;"
"table_cache_numshardbits=28;"
"max_open_files=72;"
"max_file_opening_threads=35;"
"base_background_compactions=3;"
"max_background_compactions=33;"
"use_fsync=true;"
"use_adaptive_mutex=false;"

View file

@ -61,32 +61,54 @@ void PerfContext::Reset() {
#endif
}
#define OUTPUT(counter) #counter << " = " << counter << ", "
#define OUTPUT(counter) \
if (!exclude_zero_counters || (counter > 0)) { \
ss << #counter << " = " << counter << ", "; \
}
std::string PerfContext::ToString() const {
std::string PerfContext::ToString(bool exclude_zero_counters) const {
#if defined(NPERF_CONTEXT) || defined(IOS_CROSS_COMPILE)
return "";
#else
std::ostringstream ss;
ss << OUTPUT(user_key_comparison_count) << OUTPUT(block_cache_hit_count)
<< OUTPUT(block_read_count) << OUTPUT(block_read_byte)
<< OUTPUT(block_read_time) << OUTPUT(block_checksum_time)
<< OUTPUT(block_decompress_time) << OUTPUT(internal_key_skipped_count)
<< OUTPUT(internal_delete_skipped_count) << OUTPUT(write_wal_time)
<< OUTPUT(get_snapshot_time) << OUTPUT(get_from_memtable_time)
<< OUTPUT(get_from_memtable_count) << OUTPUT(get_post_process_time)
<< OUTPUT(get_from_output_files_time) << OUTPUT(seek_on_memtable_time)
<< OUTPUT(seek_on_memtable_count) << OUTPUT(seek_child_seek_time)
<< OUTPUT(seek_child_seek_count) << OUTPUT(seek_min_heap_time)
<< OUTPUT(seek_internal_seek_time) << OUTPUT(find_next_user_entry_time)
<< OUTPUT(write_pre_and_post_process_time) << OUTPUT(write_memtable_time)
<< OUTPUT(db_mutex_lock_nanos) << OUTPUT(db_condition_wait_nanos)
<< OUTPUT(merge_operator_time_nanos) << OUTPUT(write_delay_time)
<< OUTPUT(read_index_block_nanos) << OUTPUT(read_filter_block_nanos)
<< OUTPUT(new_table_block_iter_nanos) << OUTPUT(new_table_iterator_nanos)
<< OUTPUT(block_seek_nanos) << OUTPUT(find_table_nanos)
<< OUTPUT(bloom_memtable_hit_count) << OUTPUT(bloom_memtable_miss_count)
<< OUTPUT(bloom_sst_hit_count) << OUTPUT(bloom_sst_miss_count);
OUTPUT(user_key_comparison_count);
OUTPUT(block_cache_hit_count);
OUTPUT(block_read_count);
OUTPUT(block_read_byte);
OUTPUT(block_read_time);
OUTPUT(block_checksum_time);
OUTPUT(block_decompress_time);
OUTPUT(internal_key_skipped_count);
OUTPUT(internal_delete_skipped_count);
OUTPUT(write_wal_time);
OUTPUT(get_snapshot_time);
OUTPUT(get_from_memtable_time);
OUTPUT(get_from_memtable_count);
OUTPUT(get_post_process_time);
OUTPUT(get_from_output_files_time);
OUTPUT(seek_on_memtable_time);
OUTPUT(seek_on_memtable_count);
OUTPUT(seek_child_seek_time);
OUTPUT(seek_child_seek_count);
OUTPUT(seek_min_heap_time);
OUTPUT(seek_internal_seek_time);
OUTPUT(find_next_user_entry_time);
OUTPUT(write_pre_and_post_process_time);
OUTPUT(write_memtable_time);
OUTPUT(db_mutex_lock_nanos);
OUTPUT(db_condition_wait_nanos);
OUTPUT(merge_operator_time_nanos);
OUTPUT(write_delay_time);
OUTPUT(read_index_block_nanos);
OUTPUT(read_filter_block_nanos);
OUTPUT(new_table_block_iter_nanos);
OUTPUT(new_table_iterator_nanos);
OUTPUT(block_seek_nanos);
OUTPUT(find_table_nanos);
OUTPUT(bloom_memtable_hit_count);
OUTPUT(bloom_memtable_miss_count);
OUTPUT(bloom_sst_hit_count);
OUTPUT(bloom_sst_miss_count);
return ss.str();
#endif
}