Add TablePropertiesCollector::NeedCompact() to suggest DB to further compact output files

Summary:
It is experimental. Allow users to return from a call back function TablePropertiesCollector::NeedCompact(), based on the data in the file.
It can be used to allow users to suggest DB to clear up delete tombstones faster.

Test Plan: Add a unit test.

Reviewers: igor, yhchiang, kradhakrishnan, rven

Reviewed By: rven

Subscribers: yoshinorim, MarkCallaghan, maykov, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D39585
This commit is contained in:
sdong 2015-06-04 12:03:40 -07:00
parent 2e764f06ea
commit 6df589b446
17 changed files with 196 additions and 34 deletions

View file

@ -222,6 +222,7 @@ Status BuildTable(
}
if (s.ok()) {
meta->fd.file_size = builder->FileSize();
meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetFileSize() > 0);
if (table_properties) {
*table_properties = builder->GetTableProperties();

View file

@ -66,6 +66,7 @@ struct CompactionJob::CompactionState {
uint64_t file_size;
InternalKey smallest, largest;
SequenceNumber smallest_seqno, largest_seqno;
bool need_compaction;
};
std::vector<Output> outputs;
@ -1016,6 +1017,8 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
// Check for iterator errors
Status s = input->status();
const uint64_t current_entries = compact_->builder->NumEntries();
compact_->current_output()->need_compaction =
compact_->builder->NeedCompact();
if (s.ok()) {
s = compact_->builder->Finish();
} else {
@ -1106,9 +1109,10 @@ Status CompactionJob::InstallCompactionResults(
compaction->AddInputDeletions(compact_->compaction->edit());
for (size_t i = 0; i < compact_->outputs.size(); i++) {
const CompactionState::Output& out = compact_->outputs[i];
compaction->edit()->AddFile(
compaction->output_level(), out.number, out.path_id, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
compaction->edit()->AddFile(compaction->output_level(), out.number,
out.path_id, out.file_size, out.smallest,
out.largest, out.smallest_seqno,
out.largest_seqno, out.need_compaction);
}
return versions_->LogAndApply(compaction->column_family_data(),
mutable_cf_options, compaction->edit(),

View file

@ -90,7 +90,7 @@ class CompactionJobTest : public testing::Test {
VersionEdit edit;
edit.AddFile(0, file_number, 0, 10, smallest, largest, smallest_seqno,
largest_seqno);
largest_seqno, false);
mutex_.Lock();
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),

View file

@ -1207,7 +1207,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
if (s.ok() && meta.fd.GetFileSize() > 0) {
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno);
meta.smallest_seqno, meta.largest_seqno,
meta.marked_for_compaction);
}
InternalStats::CompactionStats stats(1);
@ -1768,7 +1769,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
f->smallest_seqno, f->largest_seqno,
f->marked_for_compaction);
}
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] Apply version edit:\n%s",
@ -2457,7 +2459,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
f->smallest_seqno, f->largest_seqno,
f->marked_for_compaction);
LogToBuffer(log_buffer,
"[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",

View file

@ -130,7 +130,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
edit.DeleteFile(0, f->fd.GetNumber());
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
f->smallest_seqno, f->largest_seqno,
f->marked_for_compaction);
}
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),

View file

@ -12957,6 +12957,121 @@ TEST_F(DBTest, CompressLevelCompaction) {
Destroy(options);
}
class CountingDeleteTabPropCollector : public TablePropertiesCollector {
public:
const char* Name() const override { return "CountingDeleteTabPropCollector"; }
Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type,
SequenceNumber seq, uint64_t file_size) override {
if (type == kEntryDelete) {
num_deletes_++;
}
return Status::OK();
}
bool NeedCompact() const override { return num_deletes_ > 10; }
UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
Status Finish(UserCollectedProperties* properties) override {
*properties =
UserCollectedProperties{{"num_delete", ToString(num_deletes_)}};
return Status::OK();
}
private:
uint32_t num_deletes_ = 0;
};
class CountingDeleteTabPropCollectorFactory
: public TablePropertiesCollectorFactory {
public:
virtual TablePropertiesCollector* CreateTablePropertiesCollector() override {
return new CountingDeleteTabPropCollector();
}
const char* Name() const override {
return "CountingDeleteTabPropCollectorFactory";
}
};
TEST_F(DBTest, TablePropertiesNeedCompactTest) {
Random rnd(301);
Options options;
options.create_if_missing = true;
options.write_buffer_size = 4096;
options.max_write_buffer_number = 8;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 4;
options.target_file_size_base = 2048;
options.max_bytes_for_level_base = 10240;
options.max_bytes_for_level_multiplier = 4;
options.hard_rate_limit = 1.1;
options.num_levels = 8;
std::shared_ptr<TablePropertiesCollectorFactory> collector_factory(
new CountingDeleteTabPropCollectorFactory);
options.table_properties_collector_factories.resize(1);
options.table_properties_collector_factories[0] = collector_factory;
DestroyAndReopen(options);
const int kMaxKey = 1000;
for (int i = 0; i < kMaxKey; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 102)));
ASSERT_OK(Put(Key(kMaxKey + i), RandomString(&rnd, 102)));
}
Flush();
dbfull()->TEST_WaitForCompact();
if (NumTableFilesAtLevel(0) == 1) {
// Clear Level 0 so that when later flush a file with deletions,
// we don't trigger an organic compaction.
ASSERT_OK(Put(Key(0), ""));
ASSERT_OK(Put(Key(kMaxKey * 2), ""));
Flush();
dbfull()->TEST_WaitForCompact();
}
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
{
int c = 0;
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->Seek(Key(kMaxKey - 100));
while (iter->Valid() && iter->key().compare(Key(kMaxKey + 100)) < 0) {
iter->Next();
++c;
}
ASSERT_EQ(c, 200);
}
Delete(Key(0));
for (int i = kMaxKey - 100; i < kMaxKey + 100; i++) {
Delete(Key(i));
}
Delete(Key(kMaxKey * 2));
Flush();
dbfull()->TEST_WaitForCompact();
{
SetPerfLevel(kEnableCount);
perf_context.Reset();
int c = 0;
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->Seek(Key(kMaxKey - 100));
while (iter->Valid() && iter->key().compare(Key(kMaxKey + 100)) < 0) {
iter->Next();
}
ASSERT_EQ(c, 0);
ASSERT_LT(perf_context.internal_delete_skipped_count, 30u);
ASSERT_LT(perf_context.internal_key_skipped_count, 30u);
SetPerfLevel(kDisable);
}
}
TEST_F(DBTest, SuggestCompactRangeTest) {
class CompactionFilterFactoryGetContext : public CompactionFilterFactory {
public:
@ -12971,7 +13086,7 @@ TEST_F(DBTest, SuggestCompactRangeTest) {
}
static bool IsManual(CompactionFilterFactory* compaction_filter_factory) {
return reinterpret_cast<CompactionFilterFactoryGetContext*>(
compaction_filter_factory)->saved_context.is_manual_compaction;
compaction_filter_factory)->saved_context.is_manual_compaction;
}
CompactionFilter::Context saved_context;
};

View file

@ -296,7 +296,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno);
meta.smallest_seqno, meta.largest_seqno,
meta.marked_for_compaction);
}
InternalStats::CompactionStats stats(1);

View file

@ -401,7 +401,8 @@ class Repairer {
const TableInfo& t = tables_[i];
edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest,
t.min_sequence, t.max_sequence);
t.min_sequence, t.max_sequence,
t.meta.marked_for_compaction);
}
//fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());

View file

@ -32,6 +32,8 @@ class IntTblPropCollector {
uint64_t file_size) = 0;
virtual UserCollectedProperties GetReadableProperties() const = 0;
virtual bool NeedCompact() const { return false; }
};
// Facrtory for internal table properties collector.
@ -98,6 +100,10 @@ class UserKeyTablePropertiesCollector : public IntTblPropCollector {
UserCollectedProperties GetReadableProperties() const override;
virtual bool NeedCompact() const override {
return collector_->NeedCompact();
}
protected:
std::unique_ptr<TablePropertiesCollector> collector_;
};

View file

@ -115,7 +115,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
VersionEdit version_edit;
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
GetInternalKey("350"), 200, 200, false);
version_edit.DeleteFile(3, 27U);
EnvOptions env_options;
@ -149,7 +149,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
VersionEdit version_edit;
version_edit.AddFile(3, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
GetInternalKey("350"), 200, 200, false);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
@ -186,7 +186,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
VersionEdit version_edit;
version_edit.AddFile(4, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
GetInternalKey("350"), 200, 200, false);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
version_edit.DeleteFile(4, 6U);
@ -214,15 +214,15 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
VersionEdit version_edit;
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
GetInternalKey("350"), 200, 200, false);
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
GetInternalKey("450"), 200, 200);
GetInternalKey("450"), 200, 200, false);
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
GetInternalKey("650"), 200, 200);
GetInternalKey("650"), 200, 200, false);
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
GetInternalKey("550"), 200, 200);
GetInternalKey("550"), 200, 200, false);
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
GetInternalKey("750"), 200, 200);
GetInternalKey("750"), 200, 200, false);
EnvOptions env_options;
@ -248,24 +248,24 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
VersionEdit version_edit;
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
GetInternalKey("350"), 200, 200, false);
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
GetInternalKey("450"), 200, 200);
GetInternalKey("450"), 200, 200, false);
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
GetInternalKey("650"), 200, 200);
GetInternalKey("650"), 200, 200, false);
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
GetInternalKey("550"), 200, 200);
GetInternalKey("550"), 200, 200, false);
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
GetInternalKey("750"), 200, 200);
GetInternalKey("750"), 200, 200, false);
version_builder.Apply(&version_edit);
VersionEdit version_edit2;
version_edit.AddFile(2, 808, 0, 100U, GetInternalKey("901"),
GetInternalKey("950"), 200, 200);
GetInternalKey("950"), 200, 200, false);
version_edit2.DeleteFile(2, 616);
version_edit2.DeleteFile(2, 636);
version_edit.AddFile(2, 806, 0, 100U, GetInternalKey("801"),
GetInternalKey("850"), 200, 200);
GetInternalKey("850"), 200, 200, false);
version_builder.Apply(&version_edit2);
version_builder.SaveTo(&new_vstorage);

View file

@ -169,7 +169,8 @@ class VersionEdit {
void AddFile(int level, uint64_t file, uint32_t file_path_id,
uint64_t file_size, const InternalKey& smallest,
const InternalKey& largest, const SequenceNumber& smallest_seqno,
const SequenceNumber& largest_seqno) {
const SequenceNumber& largest_seqno,
bool marked_for_compaction) {
assert(smallest_seqno <= largest_seqno);
FileMetaData f;
f.fd = FileDescriptor(file, file_path_id, file_size);
@ -177,6 +178,7 @@ class VersionEdit {
f.largest = largest;
f.smallest_seqno = smallest_seqno;
f.largest_seqno = largest_seqno;
f.marked_for_compaction = marked_for_compaction;
new_files_.push_back(std::make_pair(level, f));
}

View file

@ -34,7 +34,7 @@ TEST_F(VersionEditTest, EncodeDecode) {
edit.AddFile(3, kBig + 300 + i, kBig32Bit + 400 + i, 0,
InternalKey("foo", kBig + 500 + i, kTypeValue),
InternalKey("zoo", kBig + 600 + i, kTypeDeletion),
kBig + 500 + i, kBig + 600 + i);
kBig + 500 + i, kBig + 600 + i, false);
edit.DeleteFile(4, kBig + 700 + i);
}
@ -47,10 +47,7 @@ TEST_F(VersionEditTest, EncodeDecode) {
TEST_F(VersionEditTest, EncodeEmptyFile) {
VersionEdit edit;
edit.AddFile(0, 0, 0, 0,
InternalKey(),
InternalKey(),
0, 0);
edit.AddFile(0, 0, 0, 0, InternalKey(), InternalKey(), 0, 0, false);
std::string buffer;
ASSERT_TRUE(!edit.EncodeTo(&buffer));
}

View file

@ -1078,7 +1078,19 @@ void VersionStorageInfo::ComputeCompactionScore(
void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
files_marked_for_compaction_.clear();
for (int level = 0; level <= MaxInputLevel(); level++) {
int last_qualify_level = 0;
// Do not include files from the last level with data
// If table properties collector suggests a file on the last level,
// we should not move it to a new level.
for (int level = num_levels() - 1; level >= 1; level--) {
if (!files_[level].empty()) {
last_qualify_level = level - 1;
break;
}
}
for (int level = 0; level <= last_qualify_level; level++) {
for (auto* f : files_[level]) {
if (!f->being_compacted && f->marked_for_compaction) {
files_marked_for_compaction_.emplace_back(level, f);
@ -2785,7 +2797,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
cfd->current()->storage_info()->LevelFiles(level)) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
f->smallest_seqno, f->largest_seqno,
f->marked_for_compaction);
}
}
edit.SetLogNumber(cfd->GetLogNumber());

View file

@ -129,6 +129,9 @@ class TablePropertiesCollector {
// The name of the properties collector can be used for debugging purpose.
virtual const char* Name() const = 0;
// EXPERIMENTAL Return whether the output file should be further compacted
virtual bool NeedCompact() const { return false; }
};
// Constructs TablePropertiesCollector. Internals create a new

View file

@ -848,6 +848,15 @@ uint64_t BlockBasedTableBuilder::FileSize() const {
return rep_->offset;
}
bool BlockBasedTableBuilder::NeedCompact() const {
for (const auto& collector : rep_->table_properties_collectors) {
if (collector->NeedCompact()) {
return true;
}
}
return false;
}
TableProperties BlockBasedTableBuilder::GetTableProperties() const {
TableProperties ret = rep_->props;
for (const auto& collector : rep_->table_properties_collectors) {

View file

@ -70,6 +70,8 @@ class BlockBasedTableBuilder : public TableBuilder {
// Finish() call, returns the size of the final generated file.
uint64_t FileSize() const override;
bool NeedCompact() const override;
// Get table properties
TableProperties GetTableProperties() const override;

View file

@ -82,6 +82,10 @@ class TableBuilder {
// Finish() call, returns the size of the final generated file.
virtual uint64_t FileSize() const = 0;
// If the user defined table properties collector suggest the file to
// be further compacted.
virtual bool NeedCompact() const { return false; }
// Returns table properties
virtual TableProperties GetTableProperties() const = 0;
};