mirror of https://github.com/facebook/rocksdb.git
Flush stale column families
Summary: Added a new option `max_total_wal_size`. Once the total WAL size goes over that, we make an attempt to flush all column families that still have data in the earliest WAL file. By default, I calculate `max_total_wal_size` dynamically, that should be good-enough for non-advanced customers. Test Plan: Added a test Reviewers: dhruba, haobo, sdong, ljin, yhchiang Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D18345
This commit is contained in:
parent
7dafa3a1d7
commit
df70047669
|
@ -238,7 +238,7 @@ class ColumnFamilyTest {
|
|||
return result;
|
||||
}
|
||||
|
||||
int CountLiveFiles(int cf) {
|
||||
int CountLiveFiles() {
|
||||
std::vector<LiveFileMetaData> metadata;
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
return static_cast<int>(metadata.size());
|
||||
|
@ -396,10 +396,10 @@ TEST(ColumnFamilyTest, DropTest) {
|
|||
}
|
||||
ASSERT_EQ("bar1", Get(1, "1"));
|
||||
|
||||
ASSERT_EQ(CountLiveFiles(1), 1);
|
||||
ASSERT_EQ(CountLiveFiles(), 1);
|
||||
DropColumnFamilies({1});
|
||||
// make sure that all files are deleted when we drop the column family
|
||||
ASSERT_EQ(CountLiveFiles(1), 0);
|
||||
ASSERT_EQ(CountLiveFiles(), 0);
|
||||
Destroy();
|
||||
}
|
||||
}
|
||||
|
@ -545,6 +545,7 @@ TEST(ColumnFamilyTest, FlushTest) {
|
|||
|
||||
// Makes sure that obsolete log files get deleted
|
||||
TEST(ColumnFamilyTest, LogDeletionTest) {
|
||||
db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
|
||||
column_family_options_.write_buffer_size = 100000; // 100KB
|
||||
Open();
|
||||
CreateColumnFamilies({"one", "two", "three", "four"});
|
||||
|
@ -611,6 +612,8 @@ TEST(ColumnFamilyTest, LogDeletionTest) {
|
|||
|
||||
// Makes sure that obsolete log files get deleted
|
||||
TEST(ColumnFamilyTest, DifferentWriteBufferSizes) {
|
||||
// disable flushing stale column families
|
||||
db_options_.max_total_wal_size = std::numeric_limits<uint64_t>::max();
|
||||
Open();
|
||||
CreateColumnFamilies({"one", "two", "three"});
|
||||
ColumnFamilyOptions default_cf, one, two, three;
|
||||
|
@ -938,6 +941,35 @@ TEST(ColumnFamilyTest, DontRollEmptyLogs) {
|
|||
Close();
|
||||
}
|
||||
|
||||
TEST(ColumnFamilyTest, FlushStaleColumnFamilies) {
|
||||
Open();
|
||||
CreateColumnFamilies({"one", "two"});
|
||||
ColumnFamilyOptions default_cf, one, two;
|
||||
default_cf.write_buffer_size = 100000; // small write buffer size
|
||||
default_cf.disable_auto_compactions = true;
|
||||
one.disable_auto_compactions = true;
|
||||
two.disable_auto_compactions = true;
|
||||
db_options_.max_total_wal_size = 210000;
|
||||
|
||||
Reopen({default_cf, one, two});
|
||||
|
||||
PutRandomData(2, 1, 10); // 10 bytes
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
PutRandomData(0, 100, 1000); // flush
|
||||
WaitForFlush(0);
|
||||
ASSERT_EQ(i + 1, CountLiveFiles());
|
||||
}
|
||||
// third flush. now, CF [two] should be detected as stale and flushed
|
||||
// column family 1 should not be flushed since it's empty
|
||||
PutRandomData(0, 100, 1000); // flush
|
||||
WaitForFlush(0);
|
||||
WaitForFlush(2);
|
||||
// 3 files for default column families, 1 file for column family [two], zero
|
||||
// files for column family [one], because it's empty
|
||||
ASSERT_EQ(4, CountLiveFiles());
|
||||
Close();
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
|
|
@ -350,6 +350,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
|||
logfile_number_(0),
|
||||
log_empty_(true),
|
||||
default_cf_handle_(nullptr),
|
||||
total_log_size_(0),
|
||||
max_total_in_memory_state_(0),
|
||||
tmp_batch_(),
|
||||
bg_schedule_needed_(false),
|
||||
bg_compaction_scheduled_(0),
|
||||
|
@ -1186,6 +1188,11 @@ Status DBImpl::Recover(
|
|||
versions_->LastSequence());
|
||||
}
|
||||
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
max_total_in_memory_state_ += cfd->options()->write_buffer_size *
|
||||
cfd->options()->max_write_buffer_number;
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -1434,9 +1441,6 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
|
|||
cfd->GetName().c_str(), (unsigned long)meta.number,
|
||||
(unsigned long)meta.file_size, s.ToString().c_str());
|
||||
|
||||
Version::LevelSummaryStorage tmp;
|
||||
Log(options_.info_log, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
|
||||
cfd->current()->LevelSummary(&tmp));
|
||||
if (!options_.disableDataSync) {
|
||||
db_directory_->Fsync();
|
||||
}
|
||||
|
@ -1536,14 +1540,19 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
|
|||
if (madeProgress) {
|
||||
*madeProgress = 1;
|
||||
}
|
||||
Version::LevelSummaryStorage tmp;
|
||||
LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
|
||||
cfd->current()->LevelSummary(&tmp));
|
||||
|
||||
MaybeScheduleLogDBDeployStats();
|
||||
|
||||
if (disable_delete_obsolete_files_ == 0) {
|
||||
// add to deletion state
|
||||
while (alive_log_files_.size() &&
|
||||
*alive_log_files_.begin() < versions_->MinLogNumber()) {
|
||||
deletion_state.log_delete_files.push_back(*alive_log_files_.begin());
|
||||
alive_log_files_.begin()->number < versions_->MinLogNumber()) {
|
||||
const auto& earliest = *alive_log_files_.begin();
|
||||
deletion_state.log_delete_files.push_back(earliest.number);
|
||||
total_log_size_ -= earliest.size;
|
||||
alive_log_files_.pop_front();
|
||||
}
|
||||
}
|
||||
|
@ -3420,6 +3429,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
|
|||
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
|
||||
Log(options_.info_log, "Created column family [%s] (ID %u)",
|
||||
column_family_name.c_str(), (unsigned)cfd->GetID());
|
||||
max_total_in_memory_state_ += cfd->options()->write_buffer_size *
|
||||
cfd->options()->max_write_buffer_number;
|
||||
} else {
|
||||
Log(options_.info_log, "Creating column family [%s] FAILED -- %s",
|
||||
column_family_name.c_str(), s.ToString().c_str());
|
||||
|
@ -3451,6 +3462,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
|||
|
||||
if (s.ok()) {
|
||||
assert(cfd->IsDropped());
|
||||
max_total_in_memory_state_ -= cfd->options()->write_buffer_size *
|
||||
cfd->options()->max_write_buffer_number;
|
||||
Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID());
|
||||
// Flush the memtables. This will make all WAL files referencing dropped
|
||||
// column family to be obsolete. They will be deleted once user deletes
|
||||
|
@ -3631,6 +3644,19 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
|||
RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1);
|
||||
}
|
||||
|
||||
int64_t flush_column_family_if_log_file = 0;
|
||||
uint64_t max_total_wal_size = (options_.max_total_wal_size == 0)
|
||||
? 2 * max_total_in_memory_state_
|
||||
: options_.max_total_wal_size;
|
||||
if (alive_log_files_.begin()->getting_flushed == false &&
|
||||
total_log_size_ > max_total_wal_size) {
|
||||
flush_column_family_if_log_file = alive_log_files_.begin()->number;
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
Log(options_.info_log,
|
||||
"Flushing all column families with data in WAL number %" PRIu64,
|
||||
flush_column_family_if_log_file);
|
||||
}
|
||||
|
||||
Status status;
|
||||
// refcounting cfd in iteration
|
||||
bool dead_cfd = false;
|
||||
|
@ -3638,8 +3664,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
|||
autovector<log::Writer*> logs_to_free;
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
cfd->Ref();
|
||||
bool force_flush = my_batch == nullptr ||
|
||||
(flush_column_family_if_log_file != 0 &&
|
||||
cfd->GetLogNumber() <= flush_column_family_if_log_file);
|
||||
// May temporarily unlock and wait.
|
||||
status = MakeRoomForWrite(cfd, my_batch == nullptr, &superversions_to_free,
|
||||
status = MakeRoomForWrite(cfd, force_flush, &superversions_to_free,
|
||||
&logs_to_free);
|
||||
if (cfd->Unref()) {
|
||||
dead_cfd = true;
|
||||
|
@ -3693,6 +3722,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
|
|||
PERF_TIMER_START(write_wal_time);
|
||||
Slice log_entry = WriteBatchInternal::Contents(updates);
|
||||
status = log_->AddRecord(log_entry);
|
||||
total_log_size_ += log_entry.size();
|
||||
alive_log_files_.back().AddSize(log_entry.size());
|
||||
log_empty_ = false;
|
||||
RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1);
|
||||
RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size());
|
||||
|
@ -4023,7 +4054,7 @@ Status DBImpl::MakeRoomForWrite(
|
|||
logs_to_free->push_back(log_.release());
|
||||
log_.reset(new_log);
|
||||
log_empty_ = true;
|
||||
alive_log_files_.push_back(logfile_number_);
|
||||
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
// all this is just optimization to delete logs that
|
||||
// are no longer needed -- if CF is empty, that means it
|
||||
|
@ -4415,7 +4446,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
|
||||
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
|
||||
}
|
||||
impl->alive_log_files_.push_back(impl->logfile_number_);
|
||||
impl->alive_log_files_.push_back(
|
||||
DBImpl::LogFileNumberSize(impl->logfile_number_));
|
||||
impl->DeleteObsoleteFiles();
|
||||
impl->MaybeScheduleFlushOrCompaction();
|
||||
impl->MaybeScheduleLogDBDeployStats();
|
||||
|
|
14
db/db_impl.h
14
db/db_impl.h
|
@ -455,7 +455,19 @@ class DBImpl : public DB {
|
|||
bool log_empty_;
|
||||
ColumnFamilyHandleImpl* default_cf_handle_;
|
||||
unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
|
||||
std::deque<uint64_t> alive_log_files_;
|
||||
struct LogFileNumberSize {
|
||||
explicit LogFileNumberSize(uint64_t _number)
|
||||
: number(_number), size(0), getting_flushed(false) {}
|
||||
void AddSize(uint64_t new_size) { size += new_size; }
|
||||
uint64_t number;
|
||||
uint64_t size;
|
||||
bool getting_flushed;
|
||||
};
|
||||
std::deque<LogFileNumberSize> alive_log_files_;
|
||||
uint64_t total_log_size_;
|
||||
// only used for dynamically adjusting max_total_wal_size. it is a sum of
|
||||
// [write_buffer_size * max_write_buffer_number] over all column families
|
||||
uint64_t max_total_in_memory_state_;
|
||||
|
||||
std::string host_name_;
|
||||
|
||||
|
|
|
@ -585,6 +585,14 @@ struct DBOptions {
|
|||
// Default: 5000
|
||||
int max_open_files;
|
||||
|
||||
// Once write-ahead logs exceed this size, we will start forcing the flush of
|
||||
// column families whose memtables are backed by the oldest live WAL file
|
||||
// (i.e. the ones that are causing all the space amplification). If set to 0
|
||||
// (default), we will dynamically choose the WAL size limit to be
|
||||
// [sum of all write_buffer_size * max_write_buffer_number] * 2
|
||||
// Default: 0
|
||||
uint64_t max_total_wal_size;
|
||||
|
||||
// If non-null, then we should collect metrics about database operations
|
||||
// Statistics objects should not be shared between DB instances as
|
||||
// it does not use any locks to prevent concurrent updates.
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
|
||||
#include "rocksdb/options.h"
|
||||
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#include <inttypes.h>
|
||||
#include <limits>
|
||||
|
||||
#include "rocksdb/cache.h"
|
||||
|
@ -160,6 +162,7 @@ DBOptions::DBOptions()
|
|||
info_log(nullptr),
|
||||
info_log_level(INFO_LEVEL),
|
||||
max_open_files(5000),
|
||||
max_total_wal_size(0),
|
||||
statistics(nullptr),
|
||||
disableDataSync(false),
|
||||
use_fsync(false),
|
||||
|
@ -198,6 +201,7 @@ DBOptions::DBOptions(const Options& options)
|
|||
info_log(options.info_log),
|
||||
info_log_level(options.info_log_level),
|
||||
max_open_files(options.max_open_files),
|
||||
max_total_wal_size(options.max_total_wal_size),
|
||||
statistics(options.statistics),
|
||||
disableDataSync(options.disableDataSync),
|
||||
use_fsync(options.use_fsync),
|
||||
|
@ -241,6 +245,7 @@ void DBOptions::Dump(Logger* log) const {
|
|||
Log(log," Options.env: %p", env);
|
||||
Log(log," Options.info_log: %p", info_log.get());
|
||||
Log(log," Options.max_open_files: %d", max_open_files);
|
||||
Log(log," Options.max_total_wal_size: %" PRIu64, max_total_wal_size);
|
||||
Log(log, " Options.disableDataSync: %d", disableDataSync);
|
||||
Log(log, " Options.use_fsync: %d", use_fsync);
|
||||
Log(log, " Options.max_log_file_size: %zu", max_log_file_size);
|
||||
|
|
Loading…
Reference in New Issue