Support options.ttl with options.max_open_files = -1 (#6060)

Summary:
Previously, options.ttl cannot be set with options.max_open_files = -1, because it makes use of creation_time field in table properties, which is not available unless max_open_files = -1. With this commit, the information will be stored in manifest and when it is available, will be used instead.

Note that, this change will break forward compatibility for release 5.1 and older.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6060

Test Plan: Extend existing test case to options.max_open_files != -1, and simulate backward compatility in one test case by forcing the value to be 0.

Differential Revision: D18631623

fbshipit-source-id: 30c232a8672de5432ce9608bb2488ecc19138830
This commit is contained in:
sdong 2019-11-22 16:01:21 -08:00 committed by Facebook Github Bot
parent adcf920f40
commit d8c28e692a
22 changed files with 344 additions and 274 deletions

View file

@ -17,6 +17,7 @@
* A batched MultiGet API (DB::MultiGet()) that supports retrieving keys from multiple column families.
* Full and partitioned filters in the block-based table use an improved Bloom filter implementation, enabled with format_version 5 (or above) because previous releases cannot read this filter. This replacement is faster and more accurate, especially for high bits per key or millions of keys in a single (full) filter. For example, the new Bloom filter has a lower false positive rate at 16 bits per key than the old one at 100 bits per key.
* Added AVX2 instructions to USE_SSE builds to accelerate the new Bloom filter and XXH3-based hash function on compatible x86_64 platforms (Haswell and later, ~2014).
* Support options.ttl with options.max_open_files = -1. File's oldest ancester time will be written to manifest. If it is availalbe, this information will be used instead of creation_time in table properties.
### Performance Improvements
* For 64-bit hashing, RocksDB is standardizing on a slightly modified preview version of XXH3. This function is now used for many non-persisted hashes, along with fastrange64() in place of the modulus operator, and some benchmarks show a slight improvement.

View file

@ -1198,11 +1198,6 @@ Status ColumnFamilyData::ValidateOptions(
}
if (cf_options.ttl > 0) {
if (db_options.max_open_files != -1) {
return Status::NotSupported(
"TTL is only supported when files are always "
"kept open (set max_open_files = -1). ");
}
if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
return Status::NotSupported(
"TTL is only supported in Block-Based Table format. ");

View file

@ -545,17 +545,16 @@ bool Compaction::ShouldFormSubcompactions() const {
}
}
uint64_t Compaction::MinInputFileCreationTime() const {
uint64_t min_creation_time = port::kMaxUint64;
uint64_t Compaction::MinInputFileOldestAncesterTime() const {
uint64_t min_oldest_ancester_time = port::kMaxUint64;
for (const auto& file : inputs_[0].files) {
if (file->fd.table_reader != nullptr &&
file->fd.table_reader->GetTableProperties() != nullptr) {
uint64_t creation_time =
file->fd.table_reader->GetTableProperties()->creation_time;
min_creation_time = std::min(min_creation_time, creation_time);
uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
if (oldest_ancester_time != 0) {
min_oldest_ancester_time =
std::min(min_oldest_ancester_time, oldest_ancester_time);
}
}
return min_creation_time;
return min_oldest_ancester_time;
}
int Compaction::GetInputBaseLevel() const {

View file

@ -291,7 +291,7 @@ class Compaction {
uint32_t max_subcompactions() const { return max_subcompactions_; }
uint64_t MinInputFileCreationTime() const;
uint64_t MinInputFileOldestAncesterTime() const;
private:
// mark (or clear) all files that are being compacted

View file

@ -1479,12 +1479,32 @@ Status CompactionJob::OpenCompactionOutputFile(
return s;
}
SubcompactionState::Output out;
out.meta.fd =
FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0);
out.finished = false;
// Try to figure out the output file's oldest ancester time.
int64_t temp_current_time = 0;
auto get_time_status = env_->GetCurrentTime(&temp_current_time);
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
if (!get_time_status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time. Status: %s",
get_time_status.ToString().c_str());
}
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
uint64_t oldest_ancester_time =
sub_compact->compaction->MinInputFileOldestAncesterTime();
if (oldest_ancester_time == port::kMaxUint64) {
oldest_ancester_time = current_time;
}
// Initialize a SubcompactionState::Output and add it to sub_compact->outputs
{
SubcompactionState::Output out;
out.meta.fd = FileDescriptor(file_number,
sub_compact->compaction->output_path_id(), 0);
out.meta.oldest_ancester_time = oldest_ancester_time;
out.finished = false;
sub_compact->outputs.push_back(out);
}
sub_compact->outputs.push_back(out);
writable_file->SetIOPriority(Env::IO_LOW);
writable_file->SetWriteLifeTimeHint(write_hint_);
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
@ -1501,21 +1521,6 @@ Status CompactionJob::OpenCompactionOutputFile(
bool skip_filters =
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
int64_t temp_current_time = 0;
auto get_time_status = env_->GetCurrentTime(&temp_current_time);
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
if (!get_time_status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time. Status: %s",
get_time_status.ToString().c_str());
}
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
uint64_t creation_time = sub_compact->compaction->MinInputFileCreationTime();
if (creation_time == port::kMaxUint64) {
creation_time = current_time;
}
sub_compact->builder.reset(NewTableBuilder(
*cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
@ -1523,9 +1528,9 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->output_compression(),
0 /*sample_for_compression */,
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), skip_filters, creation_time,
0 /* oldest_key_time */, sub_compact->compaction->max_output_file_size(),
current_time));
sub_compact->compaction->output_level(), skip_filters,
oldest_ancester_time, 0 /* oldest_key_time */,
sub_compact->compaction->max_output_file_size(), current_time));
LogFlush(db_options_.info_log);
return s;
}

View file

@ -183,7 +183,8 @@ class CompactionJobTest : public testing::Test {
VersionEdit edit;
edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
smallest_seqno, largest_seqno, false, oldest_blob_file_number);
smallest_seqno, largest_seqno, false, oldest_blob_file_number,
kUnknownOldestAncesterTime);
mutex_.Lock();
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),

View file

@ -92,7 +92,8 @@ class CompactionPickerTest : public testing::Test {
file_number, path_id, file_size,
InternalKey(smallest, smallest_seq, kTypeValue),
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber);
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
vstorage_->AddFile(level, f);

View file

@ -3525,88 +3525,131 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
const int kValueSize = 100;
Options options = CurrentOptions();
options.compression = kNoCompression;
options.ttl = 24 * 60 * 60; // 24 hours
options.max_open_files = -1;
env_->time_elapse_only_sleep_ = false;
options.env = env_;
for (bool if_restart : {false, true}) {
for (bool if_open_all_files : {false, true}) {
Options options = CurrentOptions();
options.compression = kNoCompression;
options.ttl = 24 * 60 * 60; // 24 hours
if (if_open_all_files) {
options.max_open_files = -1;
} else {
options.max_open_files = 20;
}
// RocksDB sanitize max open files to at least 20. Modify it back.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = static_cast<int*>(arg);
*max_open_files = 2;
});
// In the case where all files are opened and doing DB restart
// forcing the oldest ancester time in manifest file to be 0 to
// simulate the case of reading from an old version.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"VersionEdit::EncodeTo:VarintOldestAncesterTime", [&](void* arg) {
if (if_restart && if_open_all_files) {
std::string* encoded_fieled = static_cast<std::string*>(arg);
*encoded_fieled = "";
PutVarint64(encoded_fieled, 0);
}
});
env_->addon_time_.store(0);
DestroyAndReopen(options);
env_->time_elapse_only_sleep_ = false;
options.env = env_;
int ttl_compactions = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
auto compaction_reason = compaction->compaction_reason();
if (compaction_reason == CompactionReason::kTtl) {
ttl_compactions++;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
env_->addon_time_.store(0);
DestroyAndReopen(options);
// Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
Random rnd(301);
for (int i = 1; i <= 100; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
int ttl_compactions = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
auto compaction_reason = compaction->compaction_reason();
if (compaction_reason == CompactionReason::kTtl) {
ttl_compactions++;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
Random rnd(301);
for (int i = 1; i <= 100; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
for (int i = 101; i <= 200; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
// Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
for (int i = 1; i <= 50; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
for (int i = 51; i <= 150; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
MoveFilesToLevel(4);
ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
// Add one L1 file with key range: [26, 75].
for (int i = 26; i <= 75; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
dbfull()->TEST_WaitForCompact();
MoveFilesToLevel(1);
ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
// LSM tree:
// L1: [26 .. 75]
// L4: [1 .. 50][51 ..... 150]
// L6: [1 ........ 100][101 .... 200]
//
// On TTL expiry, TTL compaction should be initiated on L1 file, and the
// compactions should keep going on until the key range hits bottom level.
// In other words: the compaction on this data range "cascasdes" until
// reaching the bottom level.
//
// Order of events on TTL expiry:
// 1. L1 file falls to L3 via 2 trivial moves which are initiated by the
// ttl
// compaction.
// 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
// 3. The new output file from L4 falls to L5 via 1 trival move initiated
// by the ttl compaction.
// 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
// Add 25 hours and do a write
env_->addon_time_.fetch_add(25 * 60 * 60);
ASSERT_OK(Put(Key(1), "1"));
if (if_restart) {
Reopen(options);
} else {
Flush();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ(5, ttl_compactions);
env_->addon_time_.fetch_add(25 * 60 * 60);
ASSERT_OK(Put(Key(2), "1"));
if (if_restart) {
Reopen(options);
} else {
Flush();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
ASSERT_GE(ttl_compactions, 6);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
}
Flush();
for (int i = 101; i <= 200; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
// Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
for (int i = 1; i <= 50; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
for (int i = 51; i <= 150; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
MoveFilesToLevel(4);
ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
// Add one L1 file with key range: [26, 75].
for (int i = 26; i <= 75; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
}
Flush();
dbfull()->TEST_WaitForCompact();
MoveFilesToLevel(1);
ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
// LSM tree:
// L1: [26 .. 75]
// L4: [1 .. 50][51 ..... 150]
// L6: [1 ........ 100][101 .... 200]
//
// On TTL expiry, TTL compaction should be initiated on L1 file, and the
// compactions should keep going on until the key range hits bottom level.
// In other words: the compaction on this data range "cascasdes" until
// reaching the bottom level.
//
// Order of events on TTL expiry:
// 1. L1 file falls to L3 via 2 trivial moves which are initiated by the ttl
// compaction.
// 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
// 3. The new output file from L4 falls to L5 via 1 trival move initiated
// by the ttl compaction.
// 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
// Add 25 hours and do a write
env_->addon_time_.fetch_add(25 * 60 * 60);
ASSERT_OK(Put("a", "1"));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ(5, ttl_compactions);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, LevelPeriodicCompaction) {

View file

@ -1256,7 +1256,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number);
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
@ -2671,7 +2672,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction,
f->oldest_blob_file_number);
f->oldest_blob_file_number, f->oldest_ancester_time);
ROCKS_LOG_BUFFER(
log_buffer,

View file

@ -128,7 +128,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number);
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time);
}
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),

View file

@ -1175,6 +1175,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
int64_t _current_time = 0;
env_->GetCurrentTime(&_current_time); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
meta.oldest_ancester_time = current_time;
{
auto write_hint = cfd->CalculateSSTWriteHint(0);
@ -1224,7 +1225,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.oldest_blob_file_number);
meta.marked_for_compaction, meta.oldest_blob_file_number,
meta.oldest_ancester_time);
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);

View file

@ -3311,22 +3311,6 @@ TEST_F(DBTest, FIFOCompactionStyleWithCompactionAndDelete) {
}
}
// Check that FIFO-with-TTL is not supported with max_open_files != -1.
TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) {
Options options;
options.compaction_style = kCompactionStyleFIFO;
options.create_if_missing = true;
options.ttl = 600; // seconds
// Check that it is not supported with max_open_files != -1.
options.max_open_files = 100;
options = CurrentOptions(options);
ASSERT_TRUE(TryReopen(options).IsNotSupported());
options.max_open_files = -1;
ASSERT_OK(TryReopen(options));
}
// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) {
Options options;
@ -6181,19 +6165,6 @@ TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
}
}
TEST_F(DBTest, CreateColumnFamilyShouldFailOnIncompatibleOptions) {
Options options = CurrentOptions();
options.max_open_files = 100;
Reopen(options);
ColumnFamilyOptions cf_options(options);
// ttl is only supported when max_open_files is -1.
cf_options.ttl = 3600;
ColumnFamilyHandle* handle;
ASSERT_NOK(db_->CreateColumnFamily(cf_options, "pikachu", &handle));
delete handle;
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, RowCache) {
Options options = CurrentOptions();

View file

@ -244,10 +244,18 @@ Status ExternalSstFileIngestionJob::Run() {
return status;
}
// We use the import time as the ancester time. This is the time the data
// is written to the database.
uint64_t oldest_ancester_time = 0;
int64_t temp_current_time = 0;
if (env_->GetCurrentTime(&temp_current_time).ok()) {
oldest_ancester_time = static_cast<uint64_t>(temp_current_time);
}
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, f.assigned_seqno, f.assigned_seqno,
false, kInvalidBlobFileNumber);
false, kInvalidBlobFileNumber, oldest_ancester_time);
}
return status;
}

View file

@ -365,6 +365,10 @@ Status FlushJob::WriteLevel0Table() {
uint64_t oldest_key_time =
mems_.front()->ApproximateOldestKeyTime();
// It's not clear whether oldest_key_time is always available. In case
// it is not available, use current_time.
meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(),
@ -408,7 +412,8 @@ Status FlushJob::WriteLevel0Table() {
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction, meta_.oldest_blob_file_number);
meta_.marked_for_compaction, meta_.oldest_blob_file_number,
meta_.oldest_ancester_time);
}
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first first flushed memtable.

View file

@ -133,6 +133,14 @@ Status ImportColumnFamilyJob::Run() {
Status status;
edit_.SetColumnFamily(cfd_->GetID());
// We use the import time as the ancester time. This is the time the data
// is written to the database.
uint64_t oldest_ancester_time = 0;
int64_t temp_current_time = 0;
if (env_->GetCurrentTime(&temp_current_time).ok()) {
oldest_ancester_time = static_cast<uint64_t>(temp_current_time);
}
for (size_t i = 0; i < files_to_import_.size(); ++i) {
const auto& f = files_to_import_[i];
const auto& file_metadata = metadata_[i];
@ -140,7 +148,8 @@ Status ImportColumnFamilyJob::Run() {
edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, file_metadata.smallest_seqno,
file_metadata.largest_seqno, false, kInvalidBlobFileNumber);
file_metadata.largest_seqno, false, kInvalidBlobFileNumber,
oldest_ancester_time);
// If incoming sequence number is higher, update local sequence number.
if (file_metadata.largest_seqno > versions_->LastSequence()) {

View file

@ -499,6 +499,7 @@ class Repairer {
status =
AddColumnFamily(props->column_family_name, t->column_family_id);
}
t->meta.oldest_ancester_time = props->creation_time;
}
ColumnFamilyData* cfd = nullptr;
if (status.ok()) {
@ -581,7 +582,8 @@ class Repairer {
table->meta.largest, table->meta.fd.smallest_seqno,
table->meta.fd.largest_seqno,
table->meta.marked_for_compaction,
table->meta.oldest_blob_file_number);
table->meta.oldest_blob_file_number,
table->meta.oldest_ancester_time);
}
assert(next_file_number_ > 0);
vset_.MarkFileNumberUsed(next_file_number_ - 1);

View file

@ -62,7 +62,8 @@ class VersionBuilderTest : public testing::Test {
FileMetaData* f = new FileMetaData(
file_number, path_id, file_size, GetInternalKey(smallest, smallest_seq),
GetInternalKey(largest, largest_seq), smallest_seqno, largest_seqno,
/* marked_for_compact */ false, kInvalidBlobFileNumber);
/* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
f->compensated_file_size = file_size;
f->num_entries = num_entries;
f->num_deletions = num_deletions;
@ -113,7 +114,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
VersionEdit version_edit;
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.DeleteFile(3, 27U);
EnvOptions env_options;
@ -148,7 +149,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
VersionEdit version_edit;
version_edit.AddFile(3, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
@ -186,7 +187,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
VersionEdit version_edit;
version_edit.AddFile(4, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
version_edit.DeleteFile(4, 6U);
@ -215,19 +216,19 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
VersionEdit version_edit;
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
GetInternalKey("450"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
GetInternalKey("650"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
GetInternalKey("550"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
GetInternalKey("750"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
EnvOptions env_options;
@ -254,30 +255,30 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
VersionEdit version_edit;
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
GetInternalKey("450"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
GetInternalKey("650"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
GetInternalKey("550"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
GetInternalKey("750"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_builder.Apply(&version_edit);
VersionEdit version_edit2;
version_edit.AddFile(2, 808, 0, 100U, GetInternalKey("901"),
GetInternalKey("950"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_edit2.DeleteFile(2, 616);
version_edit2.DeleteFile(2, 636);
version_edit.AddFile(2, 806, 0, 100U, GetInternalKey("801"),
GetInternalKey("850"), 200, 200, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
version_builder.Apply(&version_edit2);
version_builder.SaveTo(&new_vstorage);

View file

@ -61,6 +61,7 @@ enum CustomTag : uint32_t {
// removed when manifest becomes forward-comptabile.
kMinLogNumberToKeepHack = 3,
kOldestBlobFileNumber = 4,
kOldestAncesterTime = 5,
kPathId = 65,
};
// If this bit for the custom tag is set, opening DB should fail if
@ -178,82 +179,71 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (!f.smallest.Valid() || !f.largest.Valid()) {
return false;
}
bool has_customized_fields = false;
if (f.marked_for_compaction || has_min_log_number_to_keep_ ||
f.oldest_blob_file_number != kInvalidBlobFileNumber) {
PutVarint32(dst, kNewFile4);
has_customized_fields = true;
} else if (f.fd.GetPathId() == 0) {
// Use older format to make sure user can roll back the build if they
// don't config multiple DB paths.
PutVarint32(dst, kNewFile2);
} else {
PutVarint32(dst, kNewFile3);
}
PutVarint32(dst, kNewFile4);
PutVarint32Varint64(dst, new_files_[i].first /* level */, f.fd.GetNumber());
if (f.fd.GetPathId() != 0 && !has_customized_fields) {
// kNewFile3
PutVarint32(dst, f.fd.GetPathId());
}
PutVarint64(dst, f.fd.GetFileSize());
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno);
if (has_customized_fields) {
// Customized fields' format:
// +-----------------------------+
// | 1st field's tag (varint32) |
// +-----------------------------+
// | 1st field's size (varint32) |
// +-----------------------------+
// | bytes for 1st field |
// | (based on size decoded) |
// +-----------------------------+
// | |
// | ...... |
// | |
// +-----------------------------+
// | last field's size (varint32)|
// +-----------------------------+
// | bytes for last field |
// | (based on size decoded) |
// +-----------------------------+
// | terminating tag (varint32) |
// +-----------------------------+
//
// Customized encoding for fields:
// tag kPathId: 1 byte as path_id
// tag kNeedCompaction:
// now only can take one char value 1 indicating need-compaction
//
if (f.fd.GetPathId() != 0) {
PutVarint32(dst, CustomTag::kPathId);
char p = static_cast<char>(f.fd.GetPathId());
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}
if (f.marked_for_compaction) {
PutVarint32(dst, CustomTag::kNeedCompaction);
char p = static_cast<char>(1);
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}
if (has_min_log_number_to_keep_ && !min_log_num_written) {
PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack);
std::string varint_log_number;
PutFixed64(&varint_log_number, min_log_number_to_keep_);
PutLengthPrefixedSlice(dst, Slice(varint_log_number));
min_log_num_written = true;
}
if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
PutVarint32(dst, CustomTag::kOldestBlobFileNumber);
std::string oldest_blob_file_number;
PutVarint64(&oldest_blob_file_number, f.oldest_blob_file_number);
PutLengthPrefixedSlice(dst, Slice(oldest_blob_file_number));
}
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
dst);
PutVarint32(dst, CustomTag::kTerminate);
// Customized fields' format:
// +-----------------------------+
// | 1st field's tag (varint32) |
// +-----------------------------+
// | 1st field's size (varint32) |
// +-----------------------------+
// | bytes for 1st field |
// | (based on size decoded) |
// +-----------------------------+
// | |
// | ...... |
// | |
// +-----------------------------+
// | last field's size (varint32)|
// +-----------------------------+
// | bytes for last field |
// | (based on size decoded) |
// +-----------------------------+
// | terminating tag (varint32) |
// +-----------------------------+
//
// Customized encoding for fields:
// tag kPathId: 1 byte as path_id
// tag kNeedCompaction:
// now only can take one char value 1 indicating need-compaction
//
PutVarint32(dst, CustomTag::kOldestAncesterTime);
std::string varint_oldest_ancester_time;
PutVarint64(&varint_oldest_ancester_time, f.oldest_ancester_time);
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:VarintOldestAncesterTime",
&varint_oldest_ancester_time);
PutLengthPrefixedSlice(dst, Slice(varint_oldest_ancester_time));
if (f.fd.GetPathId() != 0) {
PutVarint32(dst, CustomTag::kPathId);
char p = static_cast<char>(f.fd.GetPathId());
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}
if (f.marked_for_compaction) {
PutVarint32(dst, CustomTag::kNeedCompaction);
char p = static_cast<char>(1);
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}
if (has_min_log_number_to_keep_ && !min_log_num_written) {
PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack);
std::string varint_log_number;
PutFixed64(&varint_log_number, min_log_number_to_keep_);
PutLengthPrefixedSlice(dst, Slice(varint_log_number));
min_log_num_written = true;
}
if (f.oldest_blob_file_number != kInvalidBlobFileNumber) {
PutVarint32(dst, CustomTag::kOldestBlobFileNumber);
std::string oldest_blob_file_number;
PutVarint64(&oldest_blob_file_number, f.oldest_blob_file_number);
PutLengthPrefixedSlice(dst, Slice(oldest_blob_file_number));
}
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
dst);
PutVarint32(dst, CustomTag::kTerminate);
}
// 0 is default and does not need to be explicitly written
@ -340,6 +330,11 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
return "path_id wrong vaue";
}
break;
case kOldestAncesterTime:
if (!GetVarint64(&field, &f.oldest_ancester_time)) {
return "invalid oldest ancester time";
}
break;
case kNeedCompaction:
if (field.size() != 1) {
return "need_compaction field wrong size";
@ -663,6 +658,8 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(" blob_file:");
AppendNumberTo(&r, f.oldest_blob_file_number);
}
r.append(" oldest_ancester_time:");
AppendNumberTo(&r, f.oldest_ancester_time);
}
r.append("\n ColumnFamily: ");
AppendNumberTo(&r, column_family_);

View file

@ -16,6 +16,7 @@
#include "db/dbformat.h"
#include "memory/arena.h"
#include "rocksdb/cache.h"
#include "table/table_reader.h"
#include "util/autovector.h"
namespace rocksdb {
@ -24,6 +25,7 @@ class VersionSet;
constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF;
constexpr uint64_t kInvalidBlobFileNumber = 0;
constexpr uint64_t kUnknownOldestAncesterTime = 0;
extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id);
@ -122,18 +124,25 @@ struct FileMetaData {
// refers to. 0 is an invalid value; BlobDB numbers the files starting from 1.
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
// The file could be the compaction output from other SST files, which could
// in turn be outputs for compact older SST files. We track the memtable
// flush timestamp for the oldest SST file that eventaully contribute data
// to this file. 0 means the information is not available.
uint64_t oldest_ancester_time = 0;
FileMetaData() = default;
FileMetaData(uint64_t file, uint32_t file_path_id, uint64_t file_size,
const InternalKey& smallest_key, const InternalKey& largest_key,
const SequenceNumber& smallest_seq,
const SequenceNumber& largest_seq, bool marked_for_compact,
uint64_t oldest_blob_file)
uint64_t oldest_blob_file, uint64_t _oldest_ancester_time)
: fd(file, file_path_id, file_size, smallest_seq, largest_seq),
smallest(smallest_key),
largest(largest_key),
marked_for_compaction(marked_for_compact),
oldest_blob_file_number(oldest_blob_file) {}
oldest_blob_file_number(oldest_blob_file),
oldest_ancester_time(_oldest_ancester_time) {}
// REQUIRED: Keys must be given to the function in sorted order (it expects
// the last key to be the largest).
@ -154,6 +163,19 @@ struct FileMetaData {
fd.smallest_seqno = std::min(fd.smallest_seqno, seqno);
fd.largest_seqno = std::max(fd.largest_seqno, seqno);
}
// Try to get oldest ancester time from the class itself or table properties
// if table reader is already pinned.
// 0 means the information is not available.
uint64_t TryGetOldestAncesterTime() {
if (oldest_ancester_time != 0) {
return oldest_ancester_time;
} else if (fd.table_reader != nullptr &&
fd.table_reader->GetTableProperties() != nullptr) {
return fd.table_reader->GetTableProperties()->creation_time;
}
return 0;
}
};
// A compressed copy of file meta data that just contain minimum data needed
@ -255,12 +277,14 @@ class VersionEdit {
uint64_t file_size, const InternalKey& smallest,
const InternalKey& largest, const SequenceNumber& smallest_seqno,
const SequenceNumber& largest_seqno, bool marked_for_compaction,
uint64_t oldest_blob_file_number) {
uint64_t oldest_blob_file_number,
uint64_t oldest_ancester_time) {
assert(smallest_seqno <= largest_seqno);
new_files_.emplace_back(
level, FileMetaData(file, file_path_id, file_size, smallest, largest,
smallest_seqno, largest_seqno,
marked_for_compaction, oldest_blob_file_number));
level,
FileMetaData(file, file_path_id, file_size, smallest, largest,
smallest_seqno, largest_seqno, marked_for_compaction,
oldest_blob_file_number, oldest_ancester_time));
}
void AddFile(int level, const FileMetaData& f) {

View file

@ -36,7 +36,8 @@ TEST_F(VersionEditTest, EncodeDecode) {
edit.AddFile(3, kBig + 300 + i, kBig32Bit + 400 + i, 0,
InternalKey("foo", kBig + 500 + i, kTypeValue),
InternalKey("zoo", kBig + 600 + i, kTypeDeletion),
kBig + 500 + i, kBig + 600 + i, false, kInvalidBlobFileNumber);
kBig + 500 + i, kBig + 600 + i, false, kInvalidBlobFileNumber,
888);
edit.DeleteFile(4, kBig + 700 + i);
}
@ -53,16 +54,18 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
VersionEdit edit;
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
kBig + 600, true, kInvalidBlobFileNumber);
kBig + 600, true, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
kBig + 601, false, kInvalidBlobFileNumber);
kBig + 601, false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
edit.AddFile(5, 302, 0, 100, InternalKey("foo", kBig + 502, kTypeValue),
InternalKey("zoo", kBig + 602, kTypeDeletion), kBig + 502,
kBig + 602, true, kInvalidBlobFileNumber);
kBig + 602, true, kInvalidBlobFileNumber, 666);
edit.AddFile(5, 303, 0, 100, InternalKey("foo", kBig + 503, kTypeBlobIndex),
InternalKey("zoo", kBig + 603, kTypeBlobIndex), kBig + 503,
kBig + 603, true, 1001);
kBig + 603, true, 1001, kUnknownOldestAncesterTime);
edit.DeleteFile(4, 700);
@ -100,10 +103,11 @@ TEST_F(VersionEditTest, ForwardCompatibleNewFile4) {
VersionEdit edit;
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
kBig + 600, true, kInvalidBlobFileNumber);
kBig + 600, true, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
kBig + 601, false, kInvalidBlobFileNumber);
kBig + 601, false, kInvalidBlobFileNumber, 686);
edit.DeleteFile(4, 700);
edit.SetComparatorName("foo");
@ -149,7 +153,8 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) {
VersionEdit edit;
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
kBig + 600, true, kInvalidBlobFileNumber);
kBig + 600, true, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
edit.SetComparatorName("foo");
edit.SetLogNumber(kBig + 100);
@ -177,7 +182,7 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) {
TEST_F(VersionEditTest, EncodeEmptyFile) {
VersionEdit edit;
edit.AddFile(0, 0, 0, 0, InternalKey(), InternalKey(), 0, 0, false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
std::string buffer;
ASSERT_TRUE(!edit.EncodeTo(&buffer));
}

View file

@ -2330,13 +2330,11 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
auto status = ioptions.env->GetCurrentTime(&_current_time);
if (status.ok()) {
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (auto f : files) {
if (!f->being_compacted && f->fd.table_reader != nullptr &&
f->fd.table_reader->GetTableProperties() != nullptr) {
auto creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time > 0 &&
creation_time < (current_time - mutable_cf_options.ttl)) {
for (FileMetaData* f : files) {
if (!f->being_compacted) {
uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
if (oldest_ancester_time != 0 &&
oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
ttl_expired_files_count++;
}
}
@ -2489,12 +2487,11 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (int level = 0; level < num_levels() - 1; level++) {
for (auto f : files_[level]) {
if (!f->being_compacted && f->fd.table_reader != nullptr &&
f->fd.table_reader->GetTableProperties() != nullptr) {
auto creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time > 0 && creation_time < (current_time - ttl)) {
for (FileMetaData* f : files_[level]) {
if (!f->being_compacted) {
uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
if (oldest_ancester_time > 0 &&
oldest_ancester_time < (current_time - ttl)) {
expired_ttl_files_.emplace_back(level, f);
}
}
@ -2539,8 +2536,7 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
uint64_t file_modification_time =
f->fd.table_reader->GetTableProperties()->file_creation_time;
if (file_modification_time == 0) {
file_modification_time =
f->fd.table_reader->GetTableProperties()->creation_time;
file_modification_time = f->TryGetOldestAncesterTime();
}
if (file_modification_time == 0) {
auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
@ -4984,7 +4980,8 @@ Status VersionSet::WriteCurrentStateToManifest(log::Writer* log) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number);
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time);
}
}
edit.SetLogNumber(cfd->GetLogNumber());

View file

@ -39,7 +39,8 @@ class GenerateLevelFilesBriefTest : public testing::Test {
files_.size() + 1, 0, 0,
InternalKey(smallest, smallest_seq, kTypeValue),
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber);
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
files_.push_back(f);
}
@ -133,7 +134,8 @@ class VersionStorageInfoTest : public testing::Test {
FileMetaData* f = new FileMetaData(
file_number, 0, file_size, GetInternalKey(smallest, 0),
GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
/* marked_for_compact */ false, kInvalidBlobFileNumber);
/* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime);
f->compensated_file_size = file_size;
vstorage_.AddFile(level, f);
}
@ -144,7 +146,7 @@ class VersionStorageInfoTest : public testing::Test {
FileMetaData* f = new FileMetaData(
file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
/* largest_seq */ 0, /* marked_for_compact */ false,
kInvalidBlobFileNumber);
kInvalidBlobFileNumber, kUnknownOldestAncesterTime);
f->compensated_file_size = file_size;
vstorage_.AddFile(level, f);
}