force flushing stats CF to avoid holding old logs (#5509)

Summary:
WAL records RocksDB writes to all column families. When user flushes a a column family, the old WAL will not accept new writes but cannot be deleted yet because it may still contain live data for other column families. (See https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log#life-cycle-of-a-wal for detailed explanation)
Because of this, if there is a column family that receive very infrequent writes and no manual flush is called for it, it could prevent a lot of WALs from being deleted. PR https://github.com/facebook/rocksdb/pull/5046 introduced persistent stats column family which is a good example of such column families. Depending on the config, it may have long intervals between writes, and user is unaware of it which makes it difficult to call manual flush for it.
This PR addresses the problem for persistent stats column family by forcing a flush for persistent stats column family when 1) another column family is flushed 2) persistent stats column family's log number is the smallest among all column families, this way persistent stats column family will  keep advancing its log number when necessary, allowing RocksDB to delete old WAL files.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5509

Differential Revision: D16045896

Pulled By: miasantreble

fbshipit-source-id: 286837b633e988417f0096ff38384742d3b40ef4
This commit is contained in:
Zhongyi Xie 2019-07-01 11:53:25 -07:00 committed by Facebook Github Bot
parent c360675750
commit 3886dddc3b
4 changed files with 142 additions and 2 deletions

View File

@ -1292,6 +1292,8 @@ class DBImpl : public DB {
Status ScheduleFlushes(WriteContext* context);
void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
void SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData*>* cfds);

View File

@ -1551,13 +1551,39 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
s = SwitchMemtable(cfd, &context);
}
if (s.ok()) {
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
flush_memtable_id = cfd->imm()->GetLatestMemTableID();
flush_req.emplace_back(cfd, flush_memtable_id);
}
if (immutable_db_options_.persist_stats_to_disk) {
ColumnFamilyData* cfd_stats =
versions_->GetColumnFamilySet()->GetColumnFamily(
kPersistentStatsColumnFamilyName);
if (cfd_stats != nullptr && cfd_stats != cfd &&
!cfd_stats->mem()->IsEmpty()) {
// only force flush stats CF when it will be the only CF lagging
// behind after the current flush
bool stats_cf_flush_needed = true;
for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
if (loop_cfd == cfd_stats || loop_cfd == cfd) {
continue;
}
if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
stats_cf_flush_needed = false;
}
}
if (stats_cf_flush_needed) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Force flushing stats CF with manual flush of %s "
"to avoid holding old logs", cfd->GetName().c_str());
s = SwitchMemtable(cfd_stats, &context);
flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID();
flush_req.emplace_back(cfd_stats, flush_memtable_id);
}
}
}
}
if (s.ok() && !flush_req.empty()) {

View File

@ -1228,6 +1228,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
cfds.push_back(cfd);
}
}
MaybeFlushStatsCF(&cfds);
}
for (const auto cfd : cfds) {
cfd->Ref();
@ -1294,6 +1295,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
if (cfd_picked != nullptr) {
cfds.push_back(cfd_picked);
}
MaybeFlushStatsCF(&cfds);
}
for (const auto cfd : cfds) {
@ -1437,6 +1439,40 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
return Status::OK();
}
void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
assert(cfds != nullptr);
if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) {
ColumnFamilyData* cfd_stats =
versions_->GetColumnFamilySet()->GetColumnFamily(
kPersistentStatsColumnFamilyName);
if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) {
for (ColumnFamilyData* cfd : *cfds) {
if (cfd == cfd_stats) {
// stats CF already included in cfds
return;
}
}
// force flush stats CF when its log number is less than all other CF's
// log numbers
bool force_flush_stats_cf = true;
for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
if (loop_cfd == cfd_stats) {
continue;
}
if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
force_flush_stats_cf = false;
}
}
if (force_flush_stats_cf) {
cfds->push_back(cfd_stats);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Force flushing stats CF with automated flush "
"to avoid holding old logs");
}
}
}
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {
@ -1450,6 +1486,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfds.push_back(tmp_cfd);
}
MaybeFlushStatsCF(&cfds);
}
Status status;
for (auto& cfd : cfds) {

View File

@ -561,7 +561,7 @@ TEST_F(StatsHistoryTest, PersistentStatsReadOnly) {
Close();
// Reopen and flush memtable.
Reopen(options);
ASSERT_OK(TryReopen(options));
Flush();
Close();
// Now check keys in read only mode.
@ -569,6 +569,81 @@ TEST_F(StatsHistoryTest, PersistentStatsReadOnly) {
}
#endif // !ROCKSDB_LITE
TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) {
Options options;
options.create_if_missing = true;
options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb
options.stats_persist_period_sec = 5;
options.statistics = rocksdb::CreateDBStatistics();
options.persist_stats_to_disk = true;
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
CreateColumnFamilies({"pikachu"}, options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ColumnFamilyData* cfd_default =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
->cfd();
ColumnFamilyData* cfd_stats = static_cast<ColumnFamilyHandleImpl*>(
dbfull()->PersistentStatsColumnFamily())
->cfd();
ColumnFamilyData* cfd_test =
static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
ASSERT_OK(Put("foo", "v0"));
ASSERT_OK(Put("bar", "v0"));
ASSERT_EQ("v0", Get("bar"));
ASSERT_EQ("v0", Get("foo"));
ASSERT_OK(Put(1, "Eevee", "v0"));
ASSERT_EQ("v0", Get(1, "Eevee"));
dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
// writing to all three cf, flush default cf
// LogNumbers: default: 14, stats: 4, pikachu: 4
ASSERT_OK(Flush());
ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
ASSERT_OK(Put("foo1", "v1"));
ASSERT_OK(Put("bar1", "v1"));
ASSERT_EQ("v1", Get("bar1"));
ASSERT_EQ("v1", Get("foo1"));
ASSERT_OK(Put(1, "Vaporeon", "v1"));
ASSERT_EQ("v1", Get(1, "Vaporeon"));
// writing to default and test cf, flush test cf
// LogNumbers: default: 14, stats: 16, pikachu: 16
ASSERT_OK(Flush(1));
ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
ASSERT_GT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
ASSERT_OK(Put("foo2", "v2"));
ASSERT_OK(Put("bar2", "v2"));
ASSERT_EQ("v2", Get("bar2"));
ASSERT_EQ("v2", Get("foo2"));
dbfull()->TEST_WaitForPersistStatsRun(
[&] { mock_env->set_current_time(10); });
// writing to default and stats cf, flushing default cf
// LogNumbers: default: 19, stats: 19, pikachu: 19
ASSERT_OK(Flush());
ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
ASSERT_OK(Put("foo3", "v3"));
ASSERT_OK(Put("bar3", "v3"));
ASSERT_EQ("v3", Get("bar3"));
ASSERT_EQ("v3", Get("foo3"));
ASSERT_OK(Put(1, "Jolteon", "v3"));
ASSERT_EQ("v3", Get(1, "Jolteon"));
dbfull()->TEST_WaitForPersistStatsRun(
[&] { mock_env->set_current_time(15); });
// writing to all three cf, flushing test cf
// LogNumbers: default: 19, stats: 19, pikachu: 22
ASSERT_OK(Flush(1));
ASSERT_LT(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
Close();
}
} // namespace rocksdb
int main(int argc, char** argv) {