Add support to bulk load external files with user-defined timestamps (#12343)

Summary:
This PR adds initial support to bulk loading external sst files with user-defined timestamps.

To ensure this invariant is met while ingesting external files:
     assume there are two internal keys: <K, ts1, seq1> and <K, ts2, seq2>, the following should hold:
     ts1 < ts2 iff. seq1 < seq2

These extra requirements are added for ingesting external files with user-defined timestamps:
1) A file with overlapping user key (without timestamp) range with the db cannot be ingested. This is because we cannot ensure above invariant is met without checking each overlapped key's timestamp and compare it with the timestamp from the db. This is an expensive step. This bulk loading feature will be used by MyRocks and currently their usage can guarantee ingested file's key range doesn't overlap with db.
4f3a57a13f/storage/rocksdb/ha_rocksdb.cc (L3312)
We can consider loose this requirement by doing this check in the future, this initial support just disallow this.

2) Files with overlapping user key (without timestamp) range are not allowed to be ingested. For similar reasons, it's hard to ensure above invariant is met. For example, if we have two files where user keys are interleaved like this:
file1: [c10, c8, f10, f5]
file2: [b5, c11, f4]
Either file1 gets a bigger global seqno than file2, or the other way around, above invariant cannot be met.
So we disallow this.

2) When a column family enables user-defined timestamps, it doesn't support ingestion behind mode. Ingestion behind currently simply puts the file at the bottommost level, and assign a global seqno 0 to the file. We need to do similar search though the LSM tree for key range overlap checks to make sure aformentioned invariant is met. So this initial support disallow this mode. We can consider adding it in the future.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12343

Test Plan: Add unit tests

Reviewed By: cbi42

Differential Revision: D53686182

Pulled By: jowlyzhang

fbshipit-source-id: f05e3fb27967f7974ed40179d78634c40ecfb136
This commit is contained in:
Yu Zhang 2024-02-13 11:15:28 -08:00 committed by Facebook GitHub Bot
parent 45668a05f5
commit 10d02456b6
4 changed files with 332 additions and 26 deletions

View File

@ -5824,10 +5824,18 @@ Status DBImpl::IngestExternalFiles(
}
for (const auto& arg : args) {
const IngestExternalFileOptions& ingest_opts = arg.options;
if (ingest_opts.ingest_behind &&
!immutable_db_options_.allow_ingest_behind) {
return Status::InvalidArgument(
"can't ingest_behind file in DB with allow_ingest_behind=false");
if (ingest_opts.ingest_behind) {
if (!immutable_db_options_.allow_ingest_behind) {
return Status::InvalidArgument(
"can't ingest_behind file in DB with allow_ingest_behind=false");
}
auto ucmp = arg.column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
return Status::NotSupported(
"Column family with user-defined "
"timestamps enabled doesn't support ingest behind.");
}
}
}

View File

@ -102,6 +102,12 @@ Status ExternalSstFileIngestionJob::Prepare(
"behind mode.");
}
if (ucmp->timestamp_size() > 0 && files_overlap_) {
return Status::NotSupported(
"Files with overlapping ranges cannot be ingested to column "
"family with user-defined timestamp enabled.");
}
// Copy/Move external files into DB
std::unordered_set<size_t> ingestion_path_ids;
for (IngestedFileInfo& f : files_to_ingest_) {
@ -317,11 +323,6 @@ Status ExternalSstFileIngestionJob::Prepare(
}
}
if (!status.ok()) {
// We failed, remove all files that we copied into the db
DeleteInternalFiles();
}
return status;
}
@ -329,26 +330,25 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
SuperVersion* super_version) {
size_t n = files_to_ingest_.size();
autovector<UserKeyRange> ranges;
std::vector<std::string> keys;
ranges.reserve(n);
keys.reserve(2 * n);
size_t ts_sz = cfd_->user_comparator()->timestamp_size();
// Check all ranges [begin, end] inclusively.
for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
Slice start_ukey = file_to_ingest.smallest_internal_key.user_key();
Slice end_ukey = file_to_ingest.largest_internal_key.user_key();
auto [start, end] = MaybeAddTimestampsToRange(
&start_ukey, &end_ukey, ts_sz, &keys.emplace_back(),
&keys.emplace_back(), /*exclusive_end=*/false);
assert(start.has_value());
assert(end.has_value());
ranges.emplace_back(start.value(), end.value());
ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
file_to_ingest.largest_internal_key.user_key());
}
Status status = cfd_->RangesOverlapWithMemtables(
ranges, super_version, db_options_.allow_data_in_errors, flush_needed);
if (status.ok() && *flush_needed &&
!ingestion_options_.allow_blocking_flush) {
status = Status::InvalidArgument("External file requires flush");
if (status.ok() && *flush_needed) {
if (!ingestion_options_.allow_blocking_flush) {
status = Status::InvalidArgument("External file requires flush");
}
auto ucmp = cfd_->user_comparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
status = Status::InvalidArgument(
"Column family enables user-defined timestamps, please make "
"sure the key range (without timestamp) of external file does not "
"overlap with key range in the memtables.");
}
}
return status;
}
@ -871,10 +871,13 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
SequenceNumber* assigned_seqno) {
Status status;
*assigned_seqno = 0;
auto ucmp = cfd_->user_comparator();
const size_t ts_sz = ucmp->timestamp_size();
if (force_global_seqno || files_overlap_) {
*assigned_seqno = last_seqno + 1;
// If files overlap, we have to ingest them at level 0.
if (files_overlap_) {
assert(ts_sz == 0);
file_to_ingest->picked_level = 0;
if (ingestion_options_.fail_if_not_bottommost_level) {
status = Status::TryAgain(
@ -946,8 +949,16 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
"ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
&overlap_with_db);
file_to_ingest->picked_level = target_level;
if (overlap_with_db && *assigned_seqno == 0) {
*assigned_seqno = last_seqno + 1;
if (overlap_with_db) {
if (ts_sz > 0) {
status = Status::InvalidArgument(
"Column family enables user-defined timestamps, please make sure the "
"key range (without timestamp) of external file does not overlap "
"with key range (without timestamp) in the db");
}
if (*assigned_seqno == 0) {
*assigned_seqno = last_seqno + 1;
}
}
return status;
}

View File

@ -3063,6 +3063,285 @@ TEST_P(ExternalSSTFileTest,
delete iter;
}
class ExternalSSTFileWithTimestampTest : public ExternalSSTFileTest {
public:
ExternalSSTFileWithTimestampTest() = default;
static const std::string kValueNotFound;
static const std::string kTsNotFound;
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
}
Status IngestExternalUDTFile(const std::vector<std::string>& files,
bool allow_global_seqno = true) {
IngestExternalFileOptions opts;
opts.snapshot_consistency = true;
opts.allow_global_seqno = allow_global_seqno;
return db_->IngestExternalFile(files, opts);
}
void VerifyValueAndTs(const std::string& key,
const std::string& read_timestamp,
const std::string& expected_value,
const std::string& expected_timestamp) {
Slice read_ts = read_timestamp;
ReadOptions read_options;
read_options.timestamp = &read_ts;
std::string value;
std::string timestamp;
Status s = db_->Get(read_options, key, &value, &timestamp);
if (s.ok()) {
ASSERT_EQ(value, expected_value);
ASSERT_EQ(timestamp, expected_timestamp);
} else if (s.IsNotFound()) {
ASSERT_EQ(kValueNotFound, expected_value);
ASSERT_EQ(kTsNotFound, expected_timestamp);
} else {
assert(false);
}
}
};
const std::string ExternalSSTFileWithTimestampTest::kValueNotFound =
"NOT_FOUND";
const std::string ExternalSSTFileWithTimestampTest::kTsNotFound =
"NOT_FOUND_TS";
TEST_F(ExternalSSTFileWithTimestampTest, Basic) {
do {
Options options = CurrentOptions();
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = true;
DestroyAndReopen(options);
SstFileWriter sst_file_writer(EnvOptions(), options);
// Current file size should be 0 after sst_file_writer init and before open
// a file.
ASSERT_EQ(sst_file_writer.FileSize(), 0);
// file1.sst [0, 50)
std::string file1 = sst_files_dir_ + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
for (int k = 0; k < 50; k++) {
// write 3 versions of values for each key, write newer version first
// they are treated as logically smaller by the comparator.
for (int version = 3; version > 0; version--) {
ASSERT_OK(
sst_file_writer.Put(Key(k), EncodeAsUint64(k + version),
Key(k) + "_val" + std::to_string(version)));
}
}
ExternalSstFileInfo file1_info;
ASSERT_OK(sst_file_writer.Finish(&file1_info));
// sst_file_writer already finished, cannot add this value
ASSERT_NOK(sst_file_writer.Put(Key(100), EncodeAsUint64(1), "bad_val"));
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 150);
ASSERT_EQ(file1_info.smallest_key, Key(0) + EncodeAsUint64(0 + 3));
ASSERT_EQ(file1_info.largest_key, Key(49) + EncodeAsUint64(49 + 1));
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
// Add file using file path
ASSERT_OK(IngestExternalUDTFile({file1}));
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 50; k++) {
for (int version = 3; version > 0; version--) {
VerifyValueAndTs(Key(k), EncodeAsUint64(k + version),
Key(k) + "_val" + std::to_string(version),
EncodeAsUint64(k + version));
}
}
// file2.sst [50, 200)
// Put [key=k, ts=k, value=k_val] for k in [50, 200)
// RangeDelete[start_key=75, end_key=125, ts=100]
std::string file2 = sst_files_dir_ + "file2.sst";
int range_del_begin = 75, range_del_end = 125, range_del_ts = 100;
ASSERT_OK(sst_file_writer.Open(file2));
for (int k = 50; k < 200; k++) {
ASSERT_OK(
sst_file_writer.Put(Key(k), EncodeAsUint64(k), Key(k) + "_val"));
if (k == range_del_ts) {
ASSERT_OK(sst_file_writer.DeleteRange(
Key(range_del_begin), Key(range_del_end), EncodeAsUint64(k)));
}
}
ExternalSstFileInfo file2_info;
ASSERT_OK(sst_file_writer.Finish(&file2_info));
// Current file size should be non-zero after success write.
ASSERT_GT(sst_file_writer.FileSize(), 0);
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 150);
ASSERT_EQ(file2_info.smallest_key, Key(50) + EncodeAsUint64(50));
ASSERT_EQ(file2_info.largest_key, Key(199) + EncodeAsUint64(199));
ASSERT_EQ(file2_info.num_range_del_entries, 1);
ASSERT_EQ(file2_info.smallest_range_del_key,
Key(range_del_begin) + EncodeAsUint64(range_del_ts));
ASSERT_EQ(file2_info.largest_range_del_key,
Key(range_del_end) + EncodeAsUint64(range_del_ts));
// Add file using file path
ASSERT_OK(IngestExternalUDTFile({file2}));
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 50; k < 200; k++) {
if (k < range_del_begin || k >= range_del_end) {
VerifyValueAndTs(Key(k), EncodeAsUint64(k), Key(k) + "_val",
EncodeAsUint64(k));
}
// else {
// // FIXME(yuzhangyu): when range tombstone and point data has the
// // same seq, on read path, make range tombstone overrides point
// // data if it has a newer user-defined timestamp. This is how
// // we determine point data's overriding relationship, so we
// // should keep it consistent.
// VerifyValueAndTs(Key(k), EncodeAsUint64(k), Key(k) + "_val",
// EncodeAsUint64(k));
// VerifyValueAndTs(Key(k), EncodeAsUint64(range_del_ts),
// kValueNotFound,
// kTsNotFound);
// }
}
// file3.sst [100, 200), key range overlap with db
std::string file3 = sst_files_dir_ + "file3.sst";
ASSERT_OK(sst_file_writer.Open(file3));
for (int k = 100; k < 200; k++) {
ASSERT_OK(
sst_file_writer.Put(Key(k), EncodeAsUint64(k + 1), Key(k) + "_val1"));
}
ExternalSstFileInfo file3_info;
ASSERT_OK(sst_file_writer.Finish(&file3_info));
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 100);
ASSERT_EQ(file3_info.smallest_key, Key(100) + EncodeAsUint64(101));
ASSERT_EQ(file3_info.largest_key, Key(199) + EncodeAsUint64(200));
// Allowing ingesting a file containing overlap key range with the db is
// not safe without verifying the overlapped key has a higher timestamp
// than what the db contains, so we do not allow this regardless of
// whether global sequence number is allowed.
ASSERT_NOK(IngestExternalUDTFile({file2}));
ASSERT_NOK(IngestExternalUDTFile({file2}, /*allow_global_seqno*/ false));
// Write [0, 50)
// Write to DB newer versions to cover ingested data and move sequence
// number forward.
for (int k = 0; k < 50; k++) {
ASSERT_OK(dbfull()->Put(WriteOptions(), Key(k), EncodeAsUint64(k + 4),
Key(k) + "_val" + std::to_string(4)));
}
// Read all 4 versions (3 from ingested, 1 from live writes).
for (int k = 0; k < 50; k++) {
for (int version = 4; version > 0; version--) {
VerifyValueAndTs(Key(k), EncodeAsUint64(k + version),
Key(k) + "_val" + std::to_string(version),
EncodeAsUint64(k + version));
}
}
SequenceNumber seq_num_before_ingestion = db_->GetLatestSequenceNumber();
ASSERT_GT(seq_num_before_ingestion, 0U);
// file4.sst [200, 250)
std::string file4 = sst_files_dir_ + "file4.sst";
ASSERT_OK(sst_file_writer.Open(file4));
for (int k = 200; k < 250; k++) {
ASSERT_OK(
sst_file_writer.Put(Key(k), EncodeAsUint64(k), Key(k) + "_val"));
}
ExternalSstFileInfo file4_info;
ASSERT_OK(sst_file_writer.Finish(&file4_info));
// Current file size should be non-zero after success write.
ASSERT_GT(sst_file_writer.FileSize(), 0);
ASSERT_EQ(file4_info.file_path, file4);
ASSERT_EQ(file4_info.num_entries, 50);
ASSERT_EQ(file4_info.smallest_key, Key(200) + EncodeAsUint64(200));
ASSERT_EQ(file4_info.largest_key, Key(249) + EncodeAsUint64(249));
ASSERT_EQ(file4_info.num_range_del_entries, 0);
ASSERT_EQ(file4_info.smallest_range_del_key, "");
ASSERT_EQ(file4_info.largest_range_del_key, "");
ASSERT_OK(IngestExternalUDTFile({file4}));
// In UDT mode, any external file that can be successfully ingested also
// should not overlap with the db. As a result, they can always get the
// seq 0 assigned.
ASSERT_EQ(db_->GetLatestSequenceNumber(), seq_num_before_ingestion);
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
kRangeDelSkipConfigs));
}
TEST_F(ExternalSSTFileWithTimestampTest, SanityCheck) {
Options options = CurrentOptions();
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = true;
DestroyAndReopen(options);
SstFileWriter sst_file_writer(EnvOptions(), options);
// file1.sst [0, 100)
std::string file1 = sst_files_dir_ + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
for (int k = 0; k < 100; k++) {
ASSERT_OK(sst_file_writer.Put(Key(k), EncodeAsUint64(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
ASSERT_OK(sst_file_writer.Finish(&file1_info));
// file2.sst [50, 75)
std::string file2 = sst_files_dir_ + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
for (int k = 50; k < 75; k++) {
ASSERT_OK(
sst_file_writer.Put(Key(k), EncodeAsUint64(k + 2), Key(k) + "_val"));
}
ExternalSstFileInfo file2_info;
ASSERT_OK(sst_file_writer.Finish(&file2_info));
// Cannot ingest when files' user key range overlaps. There is no
// straightforward way to assign sequence number to the files so that they
// meet the user-defined timestamps invariant: for the same user provided key,
// the entry with a higher sequence number should not have a smaller
// timestamp. In this case: file1 has (key=k, ts=k) for k in [50, 75),
// file2 has (key=k, ts=k+2) for k in [50, 75).
// The invariant is only met if file2 is ingested after file1. In other cases
// when user key ranges are interleaved in files, no order of ingestion can
// guarantee this invariant. So we do not allow ingesting files with
// overlapping key ranges.
ASSERT_TRUE(IngestExternalUDTFile({file1, file2}).IsNotSupported());
options.allow_ingest_behind = true;
DestroyAndReopen(options);
IngestExternalFileOptions opts;
// TODO(yuzhangyu): support ingestion behind for user-defined timestamps?
// Ingesting external files with user-defined timestamps requires searching
// through the whole lsm tree to make sure there is no key range overlap with
// the db. Ingestion behind currently is doing a simply placing it at the
// bottom level step without a search, so we don't allow it either.
opts.ingest_behind = true;
ASSERT_TRUE(db_->IngestExternalFile({file1}, opts).IsNotSupported());
DestroyAndRecreateExternalSSTFilesDir();
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true),

View File

@ -1805,6 +1805,14 @@ class DB {
// to Flush the memtable first before ingesting the file.
// In the second mode we will always ingest in the bottom most level (see
// docs to IngestExternalFileOptions::ingest_behind).
// For a column family that enables user-defined timestamps, ingesting
// external SST files are supported with these limitations: 1) Ingested file's
// user key (without timestamp) range should not overlap with the db's key
// range. 2) When ingesting multiple external SST files, their key ranges
// should not overlap with each other either. 3) Ingestion behind mode is not
// supported. 4) When an ingested file contains point data and range deletion
// for the same key, the point data currently overrides the range deletion
// regardless which one has the higher user-defined timestamps.
//
// (1) External SST files can be created using SstFileWriter
// (2) We will try to ingest the files to the lowest possible level