mirror of https://github.com/facebook/rocksdb.git
Add support to bulk load external files for UDT in memtable only feature (#12356)
Summary: This PR expands on the capabilities added in https://github.com/facebook/rocksdb/issues/12343. It adds sanity checks for external file's comparator name and user-defined timestamps related flag. With this, it now supports ingesting files to a column family that enables user-defined timestamps in Memtable only feature. Two fields in the table properties are used for aformentioned check: 1) the comparator name, it records what comparator is used to create this external sst file, 2) the flag `user_defined_timestamps_persisted`. We compare these two fields with the column family's settings. The details are in util function `ValidateUserDefinedTimestampsOptions`. To optimize for the majority of the cases where sanity check should pass and the table properties read should not affect how `TableReader` is constructed, instead of read the table properties block separately and use it for sanity check before creating a `TableReader`. We continue using the current flow to first create a `TableReader`, use it for reading table properties and do sanity checks, and reset the`TableReader` for the case where the column family enables UDTs in memtable only feature, and the external file does not contain user-defined timestamps. This PR also groups other table properties related sanity check in function `GetIngestedFileInfo` into the newly added `SanityCheckTableProperties` function. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12356 Test Plan: added unit test existing unit test Reviewed By: cbi42 Differential Revision: D54025116 Pulled By: jowlyzhang fbshipit-source-id: a918276c15f9908bd9df8513ce667638882e1554
This commit is contained in:
parent
8e29f243c9
commit
f1ca47b904
|
@ -883,9 +883,8 @@ int main(int argc, char** argv) {
|
||||||
StartPhase("addfile");
|
StartPhase("addfile");
|
||||||
{
|
{
|
||||||
rocksdb_envoptions_t* env_opt = rocksdb_envoptions_create();
|
rocksdb_envoptions_t* env_opt = rocksdb_envoptions_create();
|
||||||
rocksdb_options_t* io_options = rocksdb_options_create();
|
|
||||||
rocksdb_sstfilewriter_t* writer =
|
rocksdb_sstfilewriter_t* writer =
|
||||||
rocksdb_sstfilewriter_create(env_opt, io_options);
|
rocksdb_sstfilewriter_create(env_opt, options);
|
||||||
|
|
||||||
remove(sstfilename);
|
remove(sstfilename);
|
||||||
rocksdb_sstfilewriter_open(writer, sstfilename, &err);
|
rocksdb_sstfilewriter_open(writer, sstfilename, &err);
|
||||||
|
@ -944,7 +943,6 @@ int main(int argc, char** argv) {
|
||||||
|
|
||||||
rocksdb_ingestexternalfileoptions_destroy(ing_opt);
|
rocksdb_ingestexternalfileoptions_destroy(ing_opt);
|
||||||
rocksdb_sstfilewriter_destroy(writer);
|
rocksdb_sstfilewriter_destroy(writer);
|
||||||
rocksdb_options_destroy(io_options);
|
|
||||||
rocksdb_envoptions_destroy(env_opt);
|
rocksdb_envoptions_destroy(env_opt);
|
||||||
|
|
||||||
// Delete all keys we just ingested
|
// Delete all keys we just ingested
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
// COPYING file in the root directory) and Apache 2.0 License
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
// (found in the LICENSE.Apache file in the root directory).
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
|
||||||
#include "db/external_sst_file_ingestion_job.h"
|
#include "db/external_sst_file_ingestion_job.h"
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
@ -23,7 +22,7 @@
|
||||||
#include "table/table_builder.h"
|
#include "table/table_builder.h"
|
||||||
#include "table/unique_id_impl.h"
|
#include "table/unique_id_impl.h"
|
||||||
#include "test_util/sync_point.h"
|
#include "test_util/sync_point.h"
|
||||||
#include "util/stop_watch.h"
|
#include "util/udt_util.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
|
@ -465,8 +464,7 @@ Status ExternalSstFileIngestionJob::Run() {
|
||||||
? kReservedEpochNumberForFileIngestedBehind
|
? kReservedEpochNumberForFileIngestedBehind
|
||||||
: cfd_->NewEpochNumber(),
|
: cfd_->NewEpochNumber(),
|
||||||
f.file_checksum, f.file_checksum_func_name, f.unique_id, 0, tail_size,
|
f.file_checksum, f.file_checksum_func_name, f.unique_id, 0, tail_size,
|
||||||
static_cast<bool>(
|
f.user_defined_timestamps_persisted);
|
||||||
f.table_properties.user_defined_timestamps_persisted));
|
|
||||||
f_metadata.temperature = f.file_temperature;
|
f_metadata.temperature = f.file_temperature;
|
||||||
edit_.AddFile(f.picked_level, f_metadata);
|
edit_.AddFile(f.picked_level, f_metadata);
|
||||||
}
|
}
|
||||||
|
@ -644,38 +642,21 @@ void ExternalSstFileIngestionJob::DeleteInternalFiles() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
Status ExternalSstFileIngestionJob::ResetTableReader(
|
||||||
const std::string& external_file, uint64_t new_file_number,
|
const std::string& external_file, uint64_t new_file_number,
|
||||||
IngestedFileInfo* file_to_ingest, SuperVersion* sv) {
|
bool user_defined_timestamps_persisted, SuperVersion* sv,
|
||||||
file_to_ingest->external_file_path = external_file;
|
IngestedFileInfo* file_to_ingest,
|
||||||
|
std::unique_ptr<TableReader>* table_reader) {
|
||||||
// Get external file size
|
|
||||||
Status status = fs_->GetFileSize(external_file, IOOptions(),
|
|
||||||
&file_to_ingest->file_size, nullptr);
|
|
||||||
if (!status.ok()) {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assign FD with number
|
|
||||||
file_to_ingest->fd =
|
|
||||||
FileDescriptor(new_file_number, 0, file_to_ingest->file_size);
|
|
||||||
|
|
||||||
// Create TableReader for external file
|
|
||||||
std::unique_ptr<TableReader> table_reader;
|
|
||||||
std::unique_ptr<FSRandomAccessFile> sst_file;
|
std::unique_ptr<FSRandomAccessFile> sst_file;
|
||||||
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
|
Status status =
|
||||||
|
|
||||||
status =
|
|
||||||
fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr);
|
fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
sst_file_reader.reset(new RandomAccessFileReader(
|
std::unique_ptr<RandomAccessFileReader> sst_file_reader(
|
||||||
std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
|
new RandomAccessFileReader(std::move(sst_file), external_file,
|
||||||
|
nullptr /*Env*/, io_tracer_));
|
||||||
// TODO(yuzhangyu): User-defined timestamps doesn't support external sst file
|
table_reader->reset();
|
||||||
// ingestion. Pass in the correct `user_defined_timestamps_persisted` flag
|
|
||||||
// for creating `TableReaderOptions` when the support is there.
|
|
||||||
status = cfd_->ioptions()->table_factory->NewTableReader(
|
status = cfd_->ioptions()->table_factory->NewTableReader(
|
||||||
TableReaderOptions(
|
TableReaderOptions(
|
||||||
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
|
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
|
||||||
|
@ -685,28 +666,20 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||||
/*force_direct_prefetch*/ false, /*level*/ -1,
|
/*force_direct_prefetch*/ false, /*level*/ -1,
|
||||||
/*block_cache_tracer*/ nullptr,
|
/*block_cache_tracer*/ nullptr,
|
||||||
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
|
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
|
||||||
/*cur_file_num*/ new_file_number),
|
/*cur_file_num*/ new_file_number,
|
||||||
std::move(sst_file_reader), file_to_ingest->file_size, &table_reader);
|
/* unique_id */ {}, /* largest_seqno */ 0,
|
||||||
if (!status.ok()) {
|
/* tail_size */ 0, user_defined_timestamps_persisted),
|
||||||
return status;
|
std::move(sst_file_reader), file_to_ingest->file_size, table_reader);
|
||||||
}
|
return status;
|
||||||
|
}
|
||||||
if (ingestion_options_.verify_checksums_before_ingest) {
|
|
||||||
// If customized readahead size is needed, we can pass a user option
|
|
||||||
// all the way to here. Right now we just rely on the default readahead
|
|
||||||
// to keep things simple.
|
|
||||||
// TODO: plumb Env::IOActivity, Env::IOPriority
|
|
||||||
ReadOptions ro;
|
|
||||||
ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
|
|
||||||
status = table_reader->VerifyChecksum(
|
|
||||||
ro, TableReaderCaller::kExternalSSTIngestion);
|
|
||||||
if (!status.ok()) {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
Status ExternalSstFileIngestionJob::SanityCheckTableProperties(
|
||||||
|
const std::string& external_file, uint64_t new_file_number,
|
||||||
|
SuperVersion* sv, IngestedFileInfo* file_to_ingest,
|
||||||
|
std::unique_ptr<TableReader>* table_reader) {
|
||||||
// Get the external file properties
|
// Get the external file properties
|
||||||
auto props = table_reader->GetTableProperties();
|
auto props = table_reader->get()->GetTableProperties();
|
||||||
|
assert(props.get());
|
||||||
const auto& uprops = props->user_collected_properties;
|
const auto& uprops = props->user_collected_properties;
|
||||||
|
|
||||||
// Get table version
|
// Get table version
|
||||||
|
@ -744,10 +717,99 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||||
} else {
|
} else {
|
||||||
return Status::InvalidArgument("External file version is not supported");
|
return Status::InvalidArgument("External file version is not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
||||||
|
// This assignment works fine even though `table_reader` may later be reset,
|
||||||
|
// since that will not affect how table properties are parsed, and this
|
||||||
|
// assignment is making a copy.
|
||||||
|
file_to_ingest->table_properties = *props;
|
||||||
|
|
||||||
// Get number of entries in table
|
// Get number of entries in table
|
||||||
file_to_ingest->num_entries = props->num_entries;
|
file_to_ingest->num_entries = props->num_entries;
|
||||||
file_to_ingest->num_range_deletions = props->num_range_deletions;
|
file_to_ingest->num_range_deletions = props->num_range_deletions;
|
||||||
|
|
||||||
|
// Validate table properties related to comparator name and user defined
|
||||||
|
// timestamps persisted flag.
|
||||||
|
file_to_ingest->user_defined_timestamps_persisted =
|
||||||
|
static_cast<bool>(props->user_defined_timestamps_persisted);
|
||||||
|
bool mark_sst_file_has_no_udt = false;
|
||||||
|
Status s = ValidateUserDefinedTimestampsOptions(
|
||||||
|
cfd_->user_comparator(), props->comparator_name,
|
||||||
|
cfd_->ioptions()->persist_user_defined_timestamps,
|
||||||
|
file_to_ingest->user_defined_timestamps_persisted,
|
||||||
|
&mark_sst_file_has_no_udt);
|
||||||
|
if (s.ok() && mark_sst_file_has_no_udt) {
|
||||||
|
// A column family that enables user-defined timestamps in Memtable only
|
||||||
|
// feature can also ingest external files created by a setting that disables
|
||||||
|
// user-defined timestamps. In that case, we need to re-mark the
|
||||||
|
// user_defined_timestamps_persisted flag for the file.
|
||||||
|
file_to_ingest->user_defined_timestamps_persisted = false;
|
||||||
|
} else if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
// `TableReader` is initialized with `user_defined_timestamps_persisted` flag
|
||||||
|
// to be true. If its value changed to false after this sanity check, we
|
||||||
|
// need to reset the `TableReader`.
|
||||||
|
auto ucmp = cfd_->user_comparator();
|
||||||
|
assert(ucmp);
|
||||||
|
if (ucmp->timestamp_size() > 0 &&
|
||||||
|
!file_to_ingest->user_defined_timestamps_persisted) {
|
||||||
|
s = ResetTableReader(external_file, new_file_number,
|
||||||
|
file_to_ingest->user_defined_timestamps_persisted, sv,
|
||||||
|
file_to_ingest, table_reader);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||||
|
const std::string& external_file, uint64_t new_file_number,
|
||||||
|
IngestedFileInfo* file_to_ingest, SuperVersion* sv) {
|
||||||
|
file_to_ingest->external_file_path = external_file;
|
||||||
|
|
||||||
|
// Get external file size
|
||||||
|
Status status = fs_->GetFileSize(external_file, IOOptions(),
|
||||||
|
&file_to_ingest->file_size, nullptr);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign FD with number
|
||||||
|
file_to_ingest->fd =
|
||||||
|
FileDescriptor(new_file_number, 0, file_to_ingest->file_size);
|
||||||
|
|
||||||
|
// Create TableReader for external file
|
||||||
|
std::unique_ptr<TableReader> table_reader;
|
||||||
|
// Initially create the `TableReader` with flag
|
||||||
|
// `user_defined_timestamps_persisted` to be true since that's the most common
|
||||||
|
// case
|
||||||
|
status = ResetTableReader(external_file, new_file_number,
|
||||||
|
/*user_defined_timestamps_persisted=*/true, sv,
|
||||||
|
file_to_ingest, &table_reader);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
status = SanityCheckTableProperties(external_file, new_file_number, sv,
|
||||||
|
file_to_ingest, &table_reader);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ingestion_options_.verify_checksums_before_ingest) {
|
||||||
|
// If customized readahead size is needed, we can pass a user option
|
||||||
|
// all the way to here. Right now we just rely on the default readahead
|
||||||
|
// to keep things simple.
|
||||||
|
// TODO: plumb Env::IOActivity, Env::IOPriority
|
||||||
|
ReadOptions ro;
|
||||||
|
ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
|
||||||
|
status = table_reader->VerifyChecksum(
|
||||||
|
ro, TableReaderCaller::kExternalSSTIngestion);
|
||||||
|
if (!status.ok()) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ParsedInternalKey key;
|
ParsedInternalKey key;
|
||||||
// TODO: plumb Env::IOActivity, Env::IOPriority
|
// TODO: plumb Env::IOActivity, Env::IOPriority
|
||||||
ReadOptions ro;
|
ReadOptions ro;
|
||||||
|
@ -820,7 +882,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||||
table_reader->NewRangeTombstoneIterator(ro));
|
table_reader->NewRangeTombstoneIterator(ro));
|
||||||
// We may need to adjust these key bounds, depending on whether any range
|
// We may need to adjust these key bounds, depending on whether any range
|
||||||
// deletion tombstones extend past them.
|
// deletion tombstones extend past them.
|
||||||
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
|
const Comparator* ucmp = cfd_->user_comparator();
|
||||||
if (range_del_iter != nullptr) {
|
if (range_del_iter != nullptr) {
|
||||||
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
|
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
|
||||||
range_del_iter->Next()) {
|
range_del_iter->Next()) {
|
||||||
|
@ -848,13 +910,11 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
auto s =
|
||||||
|
GetSstInternalUniqueId(file_to_ingest->table_properties.db_id,
|
||||||
file_to_ingest->table_properties = *props;
|
file_to_ingest->table_properties.db_session_id,
|
||||||
|
file_to_ingest->table_properties.orig_file_number,
|
||||||
auto s = GetSstInternalUniqueId(props->db_id, props->db_session_id,
|
&(file_to_ingest->unique_id));
|
||||||
props->orig_file_number,
|
|
||||||
&(file_to_ingest->unique_id));
|
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_WARN(db_options_.info_log,
|
ROCKS_LOG_WARN(db_options_.info_log,
|
||||||
"Failed to get SST unique id for file %s",
|
"Failed to get SST unique id for file %s",
|
||||||
|
|
|
@ -73,6 +73,14 @@ struct IngestedFileInfo {
|
||||||
Temperature file_temperature = Temperature::kUnknown;
|
Temperature file_temperature = Temperature::kUnknown;
|
||||||
// Unique id of the file to be ingested
|
// Unique id of the file to be ingested
|
||||||
UniqueId64x2 unique_id{};
|
UniqueId64x2 unique_id{};
|
||||||
|
// Whether the external file should be treated as if it has user-defined
|
||||||
|
// timestamps or not. If this flag is false, and the column family enables
|
||||||
|
// UDT feature, the file will have min-timestamp artificially padded to its
|
||||||
|
// user keys when it's read. Since it will affect how `TableReader` reads a
|
||||||
|
// table file, it's defaulted to optimize for the majority of the case where
|
||||||
|
// the user key's format in the external file matches the column family's
|
||||||
|
// setting.
|
||||||
|
bool user_defined_timestamps_persisted = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ExternalSstFileIngestionJob {
|
class ExternalSstFileIngestionJob {
|
||||||
|
@ -151,6 +159,23 @@ class ExternalSstFileIngestionJob {
|
||||||
int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }
|
int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
Status ResetTableReader(const std::string& external_file,
|
||||||
|
uint64_t new_file_number,
|
||||||
|
bool user_defined_timestamps_persisted,
|
||||||
|
SuperVersion* sv, IngestedFileInfo* file_to_ingest,
|
||||||
|
std::unique_ptr<TableReader>* table_reader);
|
||||||
|
|
||||||
|
// Read the external file's table properties to do various sanity checks and
|
||||||
|
// populates certain fields in `IngestedFileInfo` according to some table
|
||||||
|
// properties.
|
||||||
|
// In some cases when sanity check passes, `table_reader` could be reset with
|
||||||
|
// different options. For example: when external file does not contain
|
||||||
|
// timestamps while column family enables UDT in Memtables only feature.
|
||||||
|
Status SanityCheckTableProperties(const std::string& external_file,
|
||||||
|
uint64_t new_file_number, SuperVersion* sv,
|
||||||
|
IngestedFileInfo* file_to_ingest,
|
||||||
|
std::unique_ptr<TableReader>* table_reader);
|
||||||
|
|
||||||
// Open the external file and populate `file_to_ingest` with all the
|
// Open the external file and populate `file_to_ingest` with all the
|
||||||
// external information we need to ingest this file.
|
// external information we need to ingest this file.
|
||||||
Status GetIngestedFileInfo(const std::string& external_file,
|
Status GetIngestedFileInfo(const std::string& external_file,
|
||||||
|
|
|
@ -295,6 +295,25 @@ class ExternalSSTFileTest
|
||||||
int last_file_id_ = 0;
|
int last_file_id_ = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
TEST_F(ExternalSSTFileTest, ComparatorMismatch) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
Options options_diff_ucmp = options;
|
||||||
|
|
||||||
|
options.comparator = BytewiseComparator();
|
||||||
|
options_diff_ucmp.comparator = ReverseBytewiseComparator();
|
||||||
|
|
||||||
|
SstFileWriter sst_file_writer(EnvOptions(), options_diff_ucmp);
|
||||||
|
|
||||||
|
std::string file = sst_files_dir_ + "file.sst";
|
||||||
|
ASSERT_OK(sst_file_writer.Open(file));
|
||||||
|
ASSERT_OK(sst_file_writer.Put("foo", "val"));
|
||||||
|
ASSERT_OK(sst_file_writer.Put("bar", "val1"));
|
||||||
|
ASSERT_OK(sst_file_writer.Finish());
|
||||||
|
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
ASSERT_NOK(DeprecatedAddFile({file}));
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ExternalSSTFileTest, Basic) {
|
TEST_F(ExternalSSTFileTest, Basic) {
|
||||||
do {
|
do {
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
|
@ -3342,6 +3361,252 @@ TEST_F(ExternalSSTFileWithTimestampTest, SanityCheck) {
|
||||||
DestroyAndRecreateExternalSSTFilesDir();
|
DestroyAndRecreateExternalSSTFilesDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ExternalSSTFileWithTimestampTest, UDTSettingsCompatibilityCheck) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
Options disable_udt_options = options;
|
||||||
|
Options not_persist_udt_options = options;
|
||||||
|
Options persist_udt_options = options;
|
||||||
|
disable_udt_options.comparator = BytewiseComparator();
|
||||||
|
not_persist_udt_options.comparator =
|
||||||
|
test::BytewiseComparatorWithU64TsWrapper();
|
||||||
|
not_persist_udt_options.persist_user_defined_timestamps = false;
|
||||||
|
not_persist_udt_options.allow_concurrent_memtable_write = false;
|
||||||
|
persist_udt_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||||
|
persist_udt_options.persist_user_defined_timestamps = true;
|
||||||
|
|
||||||
|
EnvOptions env_options = EnvOptions();
|
||||||
|
|
||||||
|
SstFileWriter disable_udt_sst_writer(env_options, disable_udt_options);
|
||||||
|
SstFileWriter not_persist_udt_sst_writer(env_options,
|
||||||
|
not_persist_udt_options);
|
||||||
|
SstFileWriter persist_udt_sst_writer(env_options, persist_udt_options);
|
||||||
|
|
||||||
|
// File1: [0, 50), contains no timestamps
|
||||||
|
// comparator name: leveldb.BytewiseComparator
|
||||||
|
// user_defined_timestamps_persisted: true
|
||||||
|
std::string disable_udt_sst_file = sst_files_dir_ + "file1.sst";
|
||||||
|
ASSERT_OK(disable_udt_sst_writer.Open(disable_udt_sst_file));
|
||||||
|
for (int k = 0; k < 50; k++) {
|
||||||
|
ASSERT_NOK(
|
||||||
|
disable_udt_sst_writer.Put(Key(k), EncodeAsUint64(1), Key(k) + "_val"));
|
||||||
|
ASSERT_OK(disable_udt_sst_writer.Put(Key(k), Key(k) + "_val"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(disable_udt_sst_writer.Finish());
|
||||||
|
|
||||||
|
// File2: [50, 100), contains no timestamps
|
||||||
|
// comparator name: leveldb.BytewiseComparator.u64ts
|
||||||
|
// user_defined_timestamps_persisted: false
|
||||||
|
std::string not_persist_udt_sst_file = sst_files_dir_ + "file2.sst";
|
||||||
|
ASSERT_OK(not_persist_udt_sst_writer.Open(not_persist_udt_sst_file));
|
||||||
|
for (int k = 50; k < 100; k++) {
|
||||||
|
ASSERT_NOK(not_persist_udt_sst_writer.Put(Key(k), Key(k) + "_val"));
|
||||||
|
ASSERT_NOK(not_persist_udt_sst_writer.Put(Key(k), EncodeAsUint64(k),
|
||||||
|
Key(k) + "_val"));
|
||||||
|
ASSERT_OK(not_persist_udt_sst_writer.Put(Key(k), EncodeAsUint64(0),
|
||||||
|
Key(k) + "_val"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(not_persist_udt_sst_writer.Finish());
|
||||||
|
|
||||||
|
// File3: [100, 150), contains timestamp
|
||||||
|
// comparator name: leveldb.BytewiseComparator.u64ts
|
||||||
|
// user_defined_timestamps_persisted: true
|
||||||
|
std::string persist_udt_sst_file = sst_files_dir_ + "file3.sst";
|
||||||
|
ASSERT_OK(persist_udt_sst_writer.Open(persist_udt_sst_file));
|
||||||
|
for (int k = 100; k < 150; k++) {
|
||||||
|
ASSERT_NOK(persist_udt_sst_writer.Put(Key(k), Key(k) + "_val"));
|
||||||
|
ASSERT_OK(
|
||||||
|
persist_udt_sst_writer.Put(Key(k), EncodeAsUint64(k), Key(k) + "_val"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(persist_udt_sst_writer.Finish());
|
||||||
|
|
||||||
|
DestroyAndReopen(disable_udt_options);
|
||||||
|
ASSERT_OK(
|
||||||
|
IngestExternalUDTFile({disable_udt_sst_file, not_persist_udt_sst_file}));
|
||||||
|
ASSERT_NOK(IngestExternalUDTFile({persist_udt_sst_file}));
|
||||||
|
for (int k = 0; k < 100; k++) {
|
||||||
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
||||||
|
}
|
||||||
|
|
||||||
|
DestroyAndReopen(not_persist_udt_options);
|
||||||
|
ASSERT_OK(
|
||||||
|
IngestExternalUDTFile({disable_udt_sst_file, not_persist_udt_sst_file}));
|
||||||
|
ASSERT_NOK(IngestExternalUDTFile({persist_udt_sst_file}));
|
||||||
|
for (int k = 0; k < 100; k++) {
|
||||||
|
VerifyValueAndTs(Key(k), EncodeAsUint64(0), Key(k) + "_val",
|
||||||
|
EncodeAsUint64(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
DestroyAndReopen(persist_udt_options);
|
||||||
|
ASSERT_NOK(
|
||||||
|
IngestExternalUDTFile({disable_udt_sst_file, not_persist_udt_sst_file}));
|
||||||
|
ASSERT_OK(IngestExternalUDTFile({persist_udt_sst_file}));
|
||||||
|
for (int k = 100; k < 150; k++) {
|
||||||
|
VerifyValueAndTs(Key(k), EncodeAsUint64(k), Key(k) + "_val",
|
||||||
|
EncodeAsUint64(k));
|
||||||
|
}
|
||||||
|
|
||||||
|
DestroyAndRecreateExternalSSTFilesDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ExternalSSTFileWithTimestampTest, TimestampsNotPersistedBasic) {
|
||||||
|
do {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||||
|
options.persist_user_defined_timestamps = false;
|
||||||
|
options.allow_concurrent_memtable_write = false;
|
||||||
|
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||||
|
|
||||||
|
// file1.sst [0, 50)
|
||||||
|
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||||
|
ASSERT_OK(sst_file_writer.Open(file1));
|
||||||
|
for (int k = 0; k < 50; k++) {
|
||||||
|
// Attempting to write 2 versions of values for each key, only the version
|
||||||
|
// with timestamp 0 goes through.
|
||||||
|
for (int version = 1; version >= 0; version--) {
|
||||||
|
if (version == 1) {
|
||||||
|
ASSERT_NOK(
|
||||||
|
sst_file_writer.Put(Key(k), EncodeAsUint64(version),
|
||||||
|
Key(k) + "_val" + std::to_string(version)));
|
||||||
|
} else {
|
||||||
|
ASSERT_OK(
|
||||||
|
sst_file_writer.Put(Key(k), EncodeAsUint64(version),
|
||||||
|
Key(k) + "_val" + std::to_string(version)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ExternalSstFileInfo file1_info;
|
||||||
|
ASSERT_OK(sst_file_writer.Finish(&file1_info));
|
||||||
|
// sst_file_writer already finished, cannot add this value
|
||||||
|
ASSERT_NOK(sst_file_writer.Put(Key(100), EncodeAsUint64(0), "bad_val"));
|
||||||
|
|
||||||
|
ASSERT_EQ(file1_info.file_path, file1);
|
||||||
|
ASSERT_EQ(file1_info.num_entries, 50);
|
||||||
|
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
||||||
|
ASSERT_EQ(file1_info.largest_key, Key(49));
|
||||||
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
||||||
|
// Add file using file path
|
||||||
|
ASSERT_OK(IngestExternalUDTFile({file1}));
|
||||||
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
||||||
|
|
||||||
|
// Read ingested file: all data contain minimum timestamps.
|
||||||
|
for (int k = 0; k < 50; k++) {
|
||||||
|
VerifyValueAndTs(Key(k), EncodeAsUint64(0),
|
||||||
|
Key(k) + "_val" + std::to_string(0), EncodeAsUint64(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
// file2.sst [50, 200)
|
||||||
|
// Put [key=k, ts=0, value=k_val0] for k in [50, 200)
|
||||||
|
// RangeDelete[start_key=75, end_key=125, ts=0]
|
||||||
|
std::string file2 = sst_files_dir_ + "file2.sst";
|
||||||
|
int range_del_begin = 75, range_del_end = 125;
|
||||||
|
ASSERT_OK(sst_file_writer.Open(file2));
|
||||||
|
for (int k = 50; k < 200; k++) {
|
||||||
|
// All these timestamps will later be effectively 0
|
||||||
|
ASSERT_OK(
|
||||||
|
sst_file_writer.Put(Key(k), EncodeAsUint64(0), Key(k) + "_val0"));
|
||||||
|
}
|
||||||
|
ASSERT_OK(sst_file_writer.DeleteRange(
|
||||||
|
Key(range_del_begin), Key(range_del_end), EncodeAsUint64(0)));
|
||||||
|
|
||||||
|
ExternalSstFileInfo file2_info;
|
||||||
|
ASSERT_OK(sst_file_writer.Finish(&file2_info));
|
||||||
|
|
||||||
|
// Current file size should be non-zero after success write.
|
||||||
|
ASSERT_GT(sst_file_writer.FileSize(), 0);
|
||||||
|
|
||||||
|
ASSERT_EQ(file2_info.file_path, file2);
|
||||||
|
ASSERT_EQ(file2_info.num_entries, 150);
|
||||||
|
ASSERT_EQ(file2_info.smallest_key, Key(50));
|
||||||
|
ASSERT_EQ(file2_info.largest_key, Key(199));
|
||||||
|
ASSERT_EQ(file2_info.num_range_del_entries, 1);
|
||||||
|
ASSERT_EQ(file2_info.smallest_range_del_key, Key(range_del_begin));
|
||||||
|
ASSERT_EQ(file2_info.largest_range_del_key, Key(range_del_end));
|
||||||
|
// Add file using file path
|
||||||
|
ASSERT_OK(IngestExternalUDTFile({file2}));
|
||||||
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
||||||
|
|
||||||
|
// Range deletion covering point data in the same file is over-written.
|
||||||
|
for (int k = 50; k < 200; k++) {
|
||||||
|
VerifyValueAndTs(Key(k), EncodeAsUint64(0), Key(k) + "_val0",
|
||||||
|
EncodeAsUint64(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
// file3.sst [100, 200), key range overlap with db
|
||||||
|
std::string file3 = sst_files_dir_ + "file3.sst";
|
||||||
|
ASSERT_OK(sst_file_writer.Open(file3));
|
||||||
|
for (int k = 100; k < 200; k++) {
|
||||||
|
ASSERT_OK(
|
||||||
|
sst_file_writer.Put(Key(k), EncodeAsUint64(0), Key(k) + "_val0"));
|
||||||
|
}
|
||||||
|
ExternalSstFileInfo file3_info;
|
||||||
|
ASSERT_OK(sst_file_writer.Finish(&file3_info));
|
||||||
|
ASSERT_EQ(file3_info.file_path, file3);
|
||||||
|
ASSERT_EQ(file3_info.num_entries, 100);
|
||||||
|
ASSERT_EQ(file3_info.smallest_key, Key(100));
|
||||||
|
ASSERT_EQ(file3_info.largest_key, Key(199));
|
||||||
|
|
||||||
|
// In UDT mode, file with overlapping key range cannot be ingested.
|
||||||
|
ASSERT_NOK(IngestExternalUDTFile({file3}));
|
||||||
|
ASSERT_NOK(IngestExternalUDTFile({file3}, /*allow_global_seqno*/ false));
|
||||||
|
|
||||||
|
// Write [0, 50)
|
||||||
|
// Write to DB newer versions to cover ingested data and move sequence
|
||||||
|
// number forward.
|
||||||
|
for (int k = 0; k < 50; k++) {
|
||||||
|
for (int version = 1; version < 3; version++) {
|
||||||
|
ASSERT_OK(dbfull()->Put(WriteOptions(), Key(k), EncodeAsUint64(version),
|
||||||
|
Key(k) + "_val" + std::to_string(version)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read three versions (1 from ingested, 2 from live writes)
|
||||||
|
for (int k = 0; k < 50; k++) {
|
||||||
|
for (int version = 0; version < 3; version++) {
|
||||||
|
VerifyValueAndTs(Key(k), EncodeAsUint64(version),
|
||||||
|
Key(k) + "_val" + std::to_string(version),
|
||||||
|
EncodeAsUint64(version));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SequenceNumber seq_num_before_ingestion = db_->GetLatestSequenceNumber();
|
||||||
|
ASSERT_GT(seq_num_before_ingestion, 0U);
|
||||||
|
|
||||||
|
// file4.sst [200, 250)
|
||||||
|
std::string file4 = sst_files_dir_ + "file4.sst";
|
||||||
|
ASSERT_OK(sst_file_writer.Open(file4));
|
||||||
|
for (int k = 200; k < 250; k++) {
|
||||||
|
ASSERT_OK(
|
||||||
|
sst_file_writer.Put(Key(k), EncodeAsUint64(0), Key(k) + "_val"));
|
||||||
|
}
|
||||||
|
|
||||||
|
ExternalSstFileInfo file4_info;
|
||||||
|
ASSERT_OK(sst_file_writer.Finish(&file4_info));
|
||||||
|
|
||||||
|
// Current file size should be non-zero after success write.
|
||||||
|
ASSERT_GT(sst_file_writer.FileSize(), 0);
|
||||||
|
|
||||||
|
ASSERT_EQ(file4_info.file_path, file4);
|
||||||
|
ASSERT_EQ(file4_info.num_entries, 50);
|
||||||
|
ASSERT_EQ(file4_info.smallest_key, Key(200));
|
||||||
|
ASSERT_EQ(file4_info.largest_key, Key(249));
|
||||||
|
ASSERT_EQ(file4_info.num_range_del_entries, 0);
|
||||||
|
ASSERT_EQ(file4_info.smallest_range_del_key, "");
|
||||||
|
ASSERT_EQ(file4_info.largest_range_del_key, "");
|
||||||
|
|
||||||
|
ASSERT_OK(IngestExternalUDTFile({file4}));
|
||||||
|
|
||||||
|
// Ingested files do not overlap with db, they can always have global seqno
|
||||||
|
// 0 assigned.
|
||||||
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), seq_num_before_ingestion);
|
||||||
|
|
||||||
|
DestroyAndRecreateExternalSSTFilesDir();
|
||||||
|
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
|
||||||
|
kRangeDelSkipConfigs));
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
|
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
|
||||||
testing::Values(std::make_tuple(false, false),
|
testing::Values(std::make_tuple(false, false),
|
||||||
std::make_tuple(false, true),
|
std::make_tuple(false, true),
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
*Add sanity checks for ingesting external files that currently checks if the user key comparator used to create the file is compatible with the column family's user key comparator.
|
||||||
|
*Support ingesting external files for column family that has user-defined timestamps in memtable only enabled.
|
Loading…
Reference in New Issue