mirror of https://github.com/facebook/rocksdb.git
Fix/improve temperature handling for file ingestion (#12402)
Summary: Partly following up on leftovers from https://github.com/facebook/rocksdb/issues/12388 In terms of public API: * Make it clear that IngestExternalFileArg::file_temperature is just a hint for opening the existing file, though it was previously used for both copy-from temp hint and copy-to temp, which was bizarre. * Specify how IngestExternalFile assigns temperature to file ingested into DB. (See details in comments.) This approach is not perfect in terms of matching how the DB assigns temperatures, but was the simplest way to get close. The key complication for matching DB temperature assignments is that ingestion files are copied (to a destination temp) before their target level is determined (in general). * Add a temperature option to SstFileWriter::Open so that files intended for ingestion can be initially written to a chosen temperature. * Note that "fail_if_not_bottommost_level" is obsolete/confusing use of "bottommost" In terms of the implementation, there was a similar bit of oddness with the internal CopyFile API, which only took one temperature, ambiguously applicable to the source, destination, or both. This is also fixed. Eventual suggested follow-up: * Before copying files for ingestion, determine a tentative level assignment to use for destination temperature, and keep that even if final level assignment happens to be different at commit time (rare). * More temperature handling for CreateColumnFamilyWithImport and Checkpoints. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12402 Test Plan: Deeply revamped ExternalSSTFileBasicTest.IngestWithTemperature to test the new changes. Previously this test was insufficient because it was only looking at temperatures according to the DB manifest. Incorporating FileTemperatureTestFS allows us to also test the temperatures in the storage layer. Used macros instead of functions for better tracing to critical source location on test failures. Some enhancements to FileTemperatureTestFS in the process of developing the revamped test. Reviewed By: jowlyzhang Differential Revision: D54442794 Pulled By: pdillinger fbshipit-source-id: 41d9d0afdc073e6a983304c10bbc07c70cc7e995
This commit is contained in:
parent
3412195367
commit
a53ed91691
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "file/filename.h"
|
||||
#include "options/options_helper.h"
|
||||
#include "rocksdb/advanced_options.h"
|
||||
#include "rocksdb/cache.h"
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
|
@ -729,7 +730,11 @@ class FileTemperatureTestFS : public FileSystemWrapper {
|
|||
if (e != current_sst_file_temperatures_.end() &&
|
||||
e->second != opts.temperature) {
|
||||
result->reset();
|
||||
return IOStatus::PathNotFound("Temperature mismatch on " + fname);
|
||||
return IOStatus::PathNotFound(
|
||||
"Read requested temperature " +
|
||||
temperature_to_string[opts.temperature] +
|
||||
" but stored with temperature " +
|
||||
temperature_to_string[e->second] + " for " + fname);
|
||||
}
|
||||
}
|
||||
*result = WrapWithTemperature<FSSequentialFileOwnerWrapper>(
|
||||
|
@ -758,7 +763,11 @@ class FileTemperatureTestFS : public FileSystemWrapper {
|
|||
if (e != current_sst_file_temperatures_.end() &&
|
||||
e->second != opts.temperature) {
|
||||
result->reset();
|
||||
return IOStatus::PathNotFound("Temperature mismatch on " + fname);
|
||||
return IOStatus::PathNotFound(
|
||||
"Read requested temperature " +
|
||||
temperature_to_string[opts.temperature] +
|
||||
" but stored with temperature " +
|
||||
temperature_to_string[e->second] + " for " + fname);
|
||||
}
|
||||
}
|
||||
*result = WrapWithTemperature<FSRandomAccessFileOwnerWrapper>(
|
||||
|
@ -792,11 +801,37 @@ class FileTemperatureTestFS : public FileSystemWrapper {
|
|||
return target()->NewWritableFile(fname, opts, result, dbg);
|
||||
}
|
||||
|
||||
IOStatus DeleteFile(const std::string& fname, const IOOptions& options,
|
||||
IODebugContext* dbg) override {
|
||||
IOStatus ios = target()->DeleteFile(fname, options, dbg);
|
||||
if (ios.ok()) {
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
if (ParseFileName(GetFileName(fname), &number, &type) &&
|
||||
type == kTableFile) {
|
||||
MutexLock lock(&mu_);
|
||||
current_sst_file_temperatures_.erase(number);
|
||||
}
|
||||
}
|
||||
return ios;
|
||||
}
|
||||
|
||||
void CopyCurrentSstFileTemperatures(std::map<uint64_t, Temperature>* out) {
|
||||
MutexLock lock(&mu_);
|
||||
*out = current_sst_file_temperatures_;
|
||||
}
|
||||
|
||||
size_t CountCurrentSstFilesWithTemperature(Temperature temp) {
|
||||
MutexLock lock(&mu_);
|
||||
size_t count = 0;
|
||||
for (const auto& e : current_sst_file_temperatures_) {
|
||||
if (e.second == temp) {
|
||||
++count;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
void OverrideSstFileTemperature(uint64_t number, Temperature temp) {
|
||||
MutexLock lock(&mu_);
|
||||
current_sst_file_temperatures_[number] = temp;
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "rocksdb/sst_file_writer.h"
|
||||
#include "test_util/testharness.h"
|
||||
#include "test_util/testutil.h"
|
||||
#include "util/defer.h"
|
||||
#include "util/random.h"
|
||||
#include "utilities/fault_injection_env.h"
|
||||
|
||||
|
@ -1861,100 +1862,165 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileAfterDBPut) {
|
|||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, IngestWithTemperature) {
|
||||
Options options = CurrentOptions();
|
||||
const ImmutableCFOptions ioptions(options);
|
||||
options.last_level_temperature = Temperature::kWarm;
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
Reopen(options);
|
||||
// Rather than doubling the running time of this test, this boolean
|
||||
// field gets a random starting value and then alternates between
|
||||
// true and false.
|
||||
bool alternate_hint = Random::GetTLSInstance()->OneIn(2);
|
||||
Destroy(CurrentOptions());
|
||||
|
||||
auto size = GetSstSizeHelper(Temperature::kUnknown);
|
||||
ASSERT_EQ(size, 0);
|
||||
size = GetSstSizeHelper(Temperature::kWarm);
|
||||
ASSERT_EQ(size, 0);
|
||||
size = GetSstSizeHelper(Temperature::kHot);
|
||||
ASSERT_EQ(size, 0);
|
||||
for (std::string mode : {"ingest_behind", "fail_if_not", "neither"}) {
|
||||
SCOPED_TRACE("Mode: " + mode);
|
||||
|
||||
// create file01.sst (1000 => 1099) and ingest it
|
||||
std::string file1 = sst_files_dir_ + "file01.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
for (int k = 1000; k < 1100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
Options options = CurrentOptions();
|
||||
|
||||
auto test_fs =
|
||||
std::make_shared<FileTemperatureTestFS>(options.env->GetFileSystem());
|
||||
std::unique_ptr<Env> env(new CompositeEnvWrapper(options.env, test_fs));
|
||||
options.env = env.get();
|
||||
|
||||
const ImmutableCFOptions ioptions(options);
|
||||
options.last_level_temperature = Temperature::kCold;
|
||||
options.default_write_temperature = Temperature::kHot;
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.allow_ingest_behind = (mode == "ingest_behind");
|
||||
Reopen(options);
|
||||
Defer destroyer([&]() { Destroy(options); });
|
||||
|
||||
#define VERIFY_SST_COUNT(temp, expected_count_in_db, \
|
||||
expected_count_outside_db) \
|
||||
{ \
|
||||
/* Partially verify against FileSystem */ \
|
||||
ASSERT_EQ( \
|
||||
test_fs->CountCurrentSstFilesWithTemperature(temp), \
|
||||
size_t{expected_count_in_db} + size_t{expected_count_outside_db}); \
|
||||
/* Partially verify against DB manifest */ \
|
||||
if (expected_count_in_db == 0) { \
|
||||
ASSERT_EQ(GetSstSizeHelper(temp), 0); \
|
||||
} else { \
|
||||
ASSERT_GE(GetSstSizeHelper(temp), 1); \
|
||||
} \
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ(file1_info.file_path, file1);
|
||||
ASSERT_EQ(file1_info.num_entries, 100);
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(1000));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(1099));
|
||||
|
||||
std::vector<std::string> files;
|
||||
std::vector<std::string> files_checksums;
|
||||
std::vector<std::string> files_checksum_func_names;
|
||||
Temperature file_temperature = Temperature::kWarm;
|
||||
size_t ex_unknown_in_db = 0;
|
||||
size_t ex_hot_in_db = 0;
|
||||
size_t ex_warm_in_db = 0;
|
||||
size_t ex_cold_in_db = 0;
|
||||
size_t ex_unknown_outside_db = 0;
|
||||
size_t ex_hot_outside_db = 0;
|
||||
size_t ex_warm_outside_db = 0;
|
||||
size_t ex_cold_outside_db = 0;
|
||||
#define VERIFY_SST_COUNTS() \
|
||||
{ \
|
||||
VERIFY_SST_COUNT(Temperature::kUnknown, ex_unknown_in_db, \
|
||||
ex_unknown_outside_db); \
|
||||
VERIFY_SST_COUNT(Temperature::kHot, ex_hot_in_db, ex_hot_outside_db); \
|
||||
VERIFY_SST_COUNT(Temperature::kWarm, ex_warm_in_db, ex_warm_outside_db); \
|
||||
VERIFY_SST_COUNT(Temperature::kCold, ex_cold_in_db, ex_cold_outside_db); \
|
||||
}
|
||||
|
||||
files.push_back(file1);
|
||||
IngestExternalFileOptions in_opts;
|
||||
in_opts.move_files = false;
|
||||
in_opts.snapshot_consistency = true;
|
||||
in_opts.allow_global_seqno = false;
|
||||
in_opts.allow_blocking_flush = false;
|
||||
in_opts.write_global_seqno = true;
|
||||
in_opts.verify_file_checksum = false;
|
||||
IngestExternalFileArg arg;
|
||||
arg.column_family = db_->DefaultColumnFamily();
|
||||
arg.external_files = files;
|
||||
arg.options = in_opts;
|
||||
arg.files_checksums = files_checksums;
|
||||
arg.files_checksum_func_names = files_checksum_func_names;
|
||||
arg.file_temperature = file_temperature;
|
||||
s = db_->IngestExternalFiles({arg});
|
||||
ASSERT_OK(s);
|
||||
// Create sst file, using a name recognized by FileTemperatureTestFS and
|
||||
// specified temperature
|
||||
std::string file1 = sst_files_dir_ + "9000000.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1, Temperature::kWarm));
|
||||
for (int k = 1000; k < 1100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
ASSERT_OK(s);
|
||||
|
||||
// check the temperature of the file being ingested
|
||||
ColumnFamilyMetaData metadata;
|
||||
db_->GetColumnFamilyMetaData(&metadata);
|
||||
ASSERT_EQ(1, metadata.file_count);
|
||||
ASSERT_EQ(Temperature::kWarm, metadata.levels[6].files[0].temperature);
|
||||
size = GetSstSizeHelper(Temperature::kUnknown);
|
||||
ASSERT_EQ(size, 0);
|
||||
size = GetSstSizeHelper(Temperature::kWarm);
|
||||
ASSERT_GT(size, 1);
|
||||
ex_warm_outside_db++;
|
||||
VERIFY_SST_COUNTS();
|
||||
|
||||
// non-bottommost file still has unknown temperature
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
db_->GetColumnFamilyMetaData(&metadata);
|
||||
ASSERT_EQ(2, metadata.file_count);
|
||||
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
|
||||
size = GetSstSizeHelper(Temperature::kUnknown);
|
||||
ASSERT_GT(size, 0);
|
||||
size = GetSstSizeHelper(Temperature::kWarm);
|
||||
ASSERT_GT(size, 0);
|
||||
ASSERT_EQ(file1_info.file_path, file1);
|
||||
ASSERT_EQ(file1_info.num_entries, 100);
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(1000));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(1099));
|
||||
|
||||
// reopen and check the information is persisted
|
||||
Reopen(options);
|
||||
db_->GetColumnFamilyMetaData(&metadata);
|
||||
ASSERT_EQ(2, metadata.file_count);
|
||||
ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
|
||||
ASSERT_EQ(Temperature::kWarm, metadata.levels[6].files[0].temperature);
|
||||
size = GetSstSizeHelper(Temperature::kUnknown);
|
||||
ASSERT_GT(size, 0);
|
||||
size = GetSstSizeHelper(Temperature::kWarm);
|
||||
ASSERT_GT(size, 0);
|
||||
std::vector<std::string> files;
|
||||
std::vector<std::string> files_checksums;
|
||||
std::vector<std::string> files_checksum_func_names;
|
||||
|
||||
// check other non-exist temperatures
|
||||
size = GetSstSizeHelper(Temperature::kHot);
|
||||
ASSERT_EQ(size, 0);
|
||||
size = GetSstSizeHelper(Temperature::kCold);
|
||||
ASSERT_EQ(size, 0);
|
||||
std::string prop;
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
|
||||
&prop));
|
||||
ASSERT_EQ(std::atoi(prop.c_str()), 0);
|
||||
files.push_back(file1);
|
||||
IngestExternalFileOptions in_opts;
|
||||
in_opts.move_files = false;
|
||||
in_opts.snapshot_consistency = true;
|
||||
in_opts.allow_global_seqno = false;
|
||||
in_opts.allow_blocking_flush = false;
|
||||
in_opts.write_global_seqno = true;
|
||||
in_opts.verify_file_checksum = false;
|
||||
in_opts.ingest_behind = (mode == "ingest_behind");
|
||||
in_opts.fail_if_not_bottommost_level = (mode == "fail_if_not");
|
||||
IngestExternalFileArg arg;
|
||||
arg.column_family = db_->DefaultColumnFamily();
|
||||
arg.external_files = files;
|
||||
arg.options = in_opts;
|
||||
arg.files_checksums = files_checksums;
|
||||
arg.files_checksum_func_names = files_checksum_func_names;
|
||||
if ((alternate_hint = !alternate_hint)) {
|
||||
// Provide correct hint (for optimal file open performance)
|
||||
arg.file_temperature = Temperature::kWarm;
|
||||
} else {
|
||||
// No hint (also works because ingestion will read the temperature
|
||||
// according to storage)
|
||||
arg.file_temperature = Temperature::kUnknown;
|
||||
}
|
||||
s = db_->IngestExternalFiles({arg});
|
||||
ASSERT_OK(s);
|
||||
|
||||
// check the temperature of the file ingested (copied)
|
||||
ColumnFamilyMetaData metadata;
|
||||
db_->GetColumnFamilyMetaData(&metadata);
|
||||
ASSERT_EQ(1, metadata.file_count);
|
||||
|
||||
if (mode != "neither") {
|
||||
ASSERT_EQ(Temperature::kCold, metadata.levels[6].files[0].temperature);
|
||||
ex_cold_in_db++;
|
||||
} else {
|
||||
// Currently, we are only able to use last_level_temperature for ingestion
|
||||
// when using an ingestion option that guarantees ingestion to last level.
|
||||
ASSERT_EQ(Temperature::kHot, metadata.levels[6].files[0].temperature);
|
||||
ex_hot_in_db++;
|
||||
}
|
||||
VERIFY_SST_COUNTS();
|
||||
|
||||
// non-bottommost file still has kHot temperature
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
db_->GetColumnFamilyMetaData(&metadata);
|
||||
ASSERT_EQ(2, metadata.file_count);
|
||||
ASSERT_EQ(Temperature::kHot, metadata.levels[0].files[0].temperature);
|
||||
|
||||
ex_hot_in_db++;
|
||||
VERIFY_SST_COUNTS();
|
||||
|
||||
// reopen and check the information is persisted
|
||||
Reopen(options);
|
||||
db_->GetColumnFamilyMetaData(&metadata);
|
||||
ASSERT_EQ(2, metadata.file_count);
|
||||
ASSERT_EQ(Temperature::kHot, metadata.levels[0].files[0].temperature);
|
||||
if (mode != "neither") {
|
||||
ASSERT_EQ(Temperature::kCold, metadata.levels[6].files[0].temperature);
|
||||
} else {
|
||||
ASSERT_EQ(Temperature::kHot, metadata.levels[6].files[0].temperature);
|
||||
}
|
||||
|
||||
// (no change)
|
||||
VERIFY_SST_COUNTS();
|
||||
|
||||
// check invalid temperature with DB property. Not sure why the original
|
||||
// author is testing this case, but perhaps so that downgrading DB with
|
||||
// new GetProperty code using a new Temperature will report something
|
||||
// reasonable and not an error.
|
||||
std::string prop;
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
DB::Properties::kLiveSstFilesSizeAtTemperature + std::to_string(22),
|
||||
&prop));
|
||||
ASSERT_EQ(std::atoi(prop.c_str()), 0);
|
||||
#undef VERIFY_SST_COUNT
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, FailIfNotBottommostLevel) {
|
||||
|
|
|
@ -37,6 +37,8 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
// Read the information of files we are ingesting
|
||||
for (const std::string& file_path : external_files_paths) {
|
||||
IngestedFileInfo file_to_ingest;
|
||||
// For temperature, first assume it matches provided hint
|
||||
file_to_ingest.file_temperature = file_temperature;
|
||||
status =
|
||||
GetIngestedFileInfo(file_path, next_file_number++, &file_to_ingest, sv);
|
||||
if (!status.ok()) {
|
||||
|
@ -90,11 +92,6 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
}
|
||||
}
|
||||
|
||||
// Hanlde the file temperature
|
||||
for (size_t i = 0; i < num_files; i++) {
|
||||
files_to_ingest_[i].file_temperature = file_temperature;
|
||||
}
|
||||
|
||||
if (ingestion_options_.ingest_behind && files_overlap_) {
|
||||
return Status::NotSupported(
|
||||
"Files with overlapping ranges cannot be ingested with ingestion "
|
||||
|
@ -159,12 +156,25 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||
if (f.copy_file) {
|
||||
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
|
||||
nullptr);
|
||||
// CopyFile also sync the new file.
|
||||
// FIXME: use sv->mutable_cf_options.default_write_temperature and
|
||||
// sort out exact temperature handling
|
||||
status =
|
||||
CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
|
||||
// Always determining the destination temperature from the ingested-to
|
||||
// level would be difficult because in general we only find out the level
|
||||
// ingested to later, during Run().
|
||||
// However, we can guarantee "last level" temperature for when the user
|
||||
// requires ingestion to the last level.
|
||||
Temperature dst_temp =
|
||||
(ingestion_options_.ingest_behind ||
|
||||
ingestion_options_.fail_if_not_bottommost_level)
|
||||
? sv->mutable_cf_options.last_level_temperature
|
||||
: sv->mutable_cf_options.default_write_temperature;
|
||||
// Note: CopyFile also syncs the new file.
|
||||
status = CopyFile(fs_.get(), path_outside_db, f.file_temperature,
|
||||
path_inside_db, dst_temp, 0, db_options_.use_fsync,
|
||||
io_tracer_);
|
||||
// The destination of the copy will be ingested
|
||||
f.file_temperature = dst_temp;
|
||||
} else {
|
||||
// Note: we currently assume that linking files does not cross
|
||||
// temperatures, so no need to change f.file_temperature
|
||||
}
|
||||
TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
|
||||
if (!status.ok()) {
|
||||
|
@ -651,11 +661,19 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
|
|||
IngestedFileInfo* file_to_ingest,
|
||||
std::unique_ptr<TableReader>* table_reader) {
|
||||
std::unique_ptr<FSRandomAccessFile> sst_file;
|
||||
FileOptions fo{env_options_};
|
||||
fo.temperature = file_to_ingest->file_temperature;
|
||||
Status status =
|
||||
fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr);
|
||||
fs_->NewRandomAccessFile(external_file, fo, &sst_file, nullptr);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
Temperature updated_temp = sst_file->GetTemperature();
|
||||
if (updated_temp != Temperature::kUnknown &&
|
||||
updated_temp != file_to_ingest->file_temperature) {
|
||||
// The hint was missing or wrong. Track temperature reported by storage.
|
||||
file_to_ingest->file_temperature = updated_temp;
|
||||
}
|
||||
std::unique_ptr<RandomAccessFileReader> sst_file_reader(
|
||||
new RandomAccessFileReader(std::move(sst_file), external_file,
|
||||
nullptr /*Env*/, io_tracer_));
|
||||
|
@ -1029,12 +1047,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
|
|||
Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
|
||||
IngestedFileInfo* file_to_ingest) {
|
||||
auto* vstorage = cfd_->current()->storage_info();
|
||||
// First, check if new files fit in the bottommost level
|
||||
int bottom_lvl = cfd_->NumberLevels() - 1;
|
||||
if (!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
|
||||
// First, check if new files fit in the last level
|
||||
int last_lvl = cfd_->NumberLevels() - 1;
|
||||
if (!IngestedFileFitInLevel(file_to_ingest, last_lvl)) {
|
||||
return Status::InvalidArgument(
|
||||
"Can't ingest_behind file as it doesn't fit "
|
||||
"at the bottommost level!");
|
||||
"at the last level!");
|
||||
}
|
||||
|
||||
// Second, check if despite allow_ingest_behind=true we still have 0 seqnums
|
||||
|
@ -1049,7 +1067,7 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
|
|||
}
|
||||
}
|
||||
|
||||
file_to_ingest->picked_level = bottom_lvl;
|
||||
file_to_ingest->picked_level = last_lvl;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -126,9 +126,10 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
|
|||
}
|
||||
}
|
||||
if (!hardlink_files) {
|
||||
status =
|
||||
CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
|
||||
// FIXME: temperature handling (like ExternalSstFileIngestionJob)
|
||||
status = CopyFile(fs_.get(), path_outside_db, Temperature::kUnknown,
|
||||
path_inside_db, Temperature::kUnknown, 0,
|
||||
db_options_.use_fsync, io_tracer_);
|
||||
}
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
|
|
|
@ -116,9 +116,9 @@ UniqueIdVerifier::UniqueIdVerifier(const std::string& db_name, Env* env)
|
|||
new WritableFileWriter(std::move(file_writer), path_, FileOptions()));
|
||||
|
||||
if (size > 0) {
|
||||
st = CopyFile(fs.get(), tmp_path, data_file_writer_, size,
|
||||
/*use_fsync*/ true, /*io_tracer*/ nullptr,
|
||||
/*temparature*/ Temperature::kHot);
|
||||
st = CopyFile(fs.get(), tmp_path, Temperature::kUnknown, data_file_writer_,
|
||||
size,
|
||||
/*use_fsync*/ true, /*io_tracer*/ nullptr);
|
||||
if (!st.ok()) {
|
||||
fprintf(stderr, "Error copying contents of old unique id file: %s\n",
|
||||
st.ToString().c_str());
|
||||
|
|
|
@ -308,9 +308,10 @@ Status FileExpectedStateManager::SaveAtAndAfter(DB* db) {
|
|||
|
||||
// Populate a tempfile and then rename it to atomically create "<seqno>.state"
|
||||
// with contents from "LATEST.state"
|
||||
Status s = CopyFile(FileSystem::Default(), latest_file_path,
|
||||
state_file_temp_path, 0 /* size */, false /* use_fsync */,
|
||||
nullptr /* io_tracer */, Temperature::kUnknown);
|
||||
Status s =
|
||||
CopyFile(FileSystem::Default(), latest_file_path, Temperature::kUnknown,
|
||||
state_file_temp_path, Temperature::kUnknown, 0 /* size */,
|
||||
false /* use_fsync */, nullptr /* io_tracer */);
|
||||
if (s.ok()) {
|
||||
s = FileSystem::Default()->RenameFile(state_file_temp_path, state_file_path,
|
||||
IOOptions(), nullptr /* dbg */);
|
||||
|
@ -633,9 +634,9 @@ Status FileExpectedStateManager::Restore(DB* db) {
|
|||
// We are going to replay on top of "`seqno`.state" to create a new
|
||||
// "LATEST.state". Start off by creating a tempfile so we can later make the
|
||||
// new "LATEST.state" appear atomically using `RenameFile()`.
|
||||
s = CopyFile(FileSystem::Default(), state_file_path, latest_file_temp_path,
|
||||
0 /* size */, false /* use_fsync */, nullptr /* io_tracer */,
|
||||
Temperature::kUnknown);
|
||||
s = CopyFile(FileSystem::Default(), state_file_path, Temperature::kUnknown,
|
||||
latest_file_temp_path, Temperature::kUnknown, 0 /* size */,
|
||||
false /* use_fsync */, nullptr /* io_tracer */);
|
||||
}
|
||||
|
||||
{
|
||||
|
|
|
@ -19,17 +19,17 @@ namespace ROCKSDB_NAMESPACE {
|
|||
|
||||
// Utility function to copy a file up to a specified length
|
||||
IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
||||
Temperature src_temp_hint,
|
||||
std::unique_ptr<WritableFileWriter>& dest_writer,
|
||||
uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature) {
|
||||
const std::shared_ptr<IOTracer>& io_tracer) {
|
||||
FileOptions soptions;
|
||||
IOStatus io_s;
|
||||
std::unique_ptr<SequentialFileReader> src_reader;
|
||||
const IOOptions opts;
|
||||
|
||||
{
|
||||
soptions.temperature = temperature;
|
||||
soptions.temperature = src_temp_hint;
|
||||
std::unique_ptr<FSSequentialFile> srcfile;
|
||||
io_s = fs->NewSequentialFile(source, soptions, &srcfile, nullptr);
|
||||
if (!io_s.ok()) {
|
||||
|
@ -72,15 +72,15 @@ IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
|||
}
|
||||
|
||||
IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
||||
const std::string& destination, uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature) {
|
||||
Temperature src_temp_hint, const std::string& destination,
|
||||
Temperature dst_temp, uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer) {
|
||||
FileOptions options;
|
||||
IOStatus io_s;
|
||||
std::unique_ptr<WritableFileWriter> dest_writer;
|
||||
|
||||
{
|
||||
options.temperature = temperature;
|
||||
options.temperature = dst_temp;
|
||||
std::unique_ptr<FSWritableFile> destfile;
|
||||
io_s = fs->NewWritableFile(destination, options, &destfile, nullptr);
|
||||
if (!io_s.ok()) {
|
||||
|
@ -92,8 +92,8 @@ IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
|||
new WritableFileWriter(std::move(destfile), destination, options));
|
||||
}
|
||||
|
||||
return CopyFile(fs, source, dest_writer, size, use_fsync, io_tracer,
|
||||
temperature);
|
||||
return CopyFile(fs, source, src_temp_hint, dest_writer, size, use_fsync,
|
||||
io_tracer);
|
||||
}
|
||||
|
||||
// Utility function to create a file with the provided contents
|
||||
|
|
|
@ -21,22 +21,21 @@ namespace ROCKSDB_NAMESPACE {
|
|||
// use_fsync maps to options.use_fsync, which determines the way that
|
||||
// the file is synced after copying.
|
||||
IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
||||
Temperature src_temp_hint,
|
||||
std::unique_ptr<WritableFileWriter>& dest_writer,
|
||||
uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature);
|
||||
const std::shared_ptr<IOTracer>& io_tracer);
|
||||
IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
||||
const std::string& destination, uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature);
|
||||
Temperature src_temp_hint, const std::string& destination,
|
||||
Temperature dst_temp, uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer);
|
||||
inline IOStatus CopyFile(const std::shared_ptr<FileSystem>& fs,
|
||||
const std::string& source,
|
||||
const std::string& destination, uint64_t size,
|
||||
bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature) {
|
||||
return CopyFile(fs.get(), source, destination, size, use_fsync, io_tracer,
|
||||
temperature);
|
||||
const std::string& source, Temperature src_temp_hint,
|
||||
const std::string& destination, Temperature dst_temp,
|
||||
uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer) {
|
||||
return CopyFile(fs.get(), source, src_temp_hint, destination, dst_temp, size,
|
||||
use_fsync, io_tracer);
|
||||
}
|
||||
IOStatus CreateFile(FileSystem* fs, const std::string& destination,
|
||||
const std::string& contents, bool use_fsync);
|
||||
|
|
|
@ -131,6 +131,7 @@ struct IngestExternalFileArg {
|
|||
IngestExternalFileOptions options;
|
||||
std::vector<std::string> files_checksums;
|
||||
std::vector<std::string> files_checksum_func_names;
|
||||
// A hint as to the temperature for *reading* the files to be ingested.
|
||||
Temperature file_temperature = Temperature::kUnknown;
|
||||
};
|
||||
|
||||
|
@ -1812,6 +1813,16 @@ class DB {
|
|||
// the files cannot be ingested to the bottommost level, and it is the
|
||||
// user's responsibility to clear the bottommost level in the overlapping
|
||||
// range before re-attempting the ingestion.
|
||||
//
|
||||
// EXPERIMENTAL: the temperatures of the files after ingestion are currently
|
||||
// determined like this:
|
||||
// - If the ingested file is moved rather than copied, its temperature is
|
||||
// inherited from the input file.
|
||||
// - If either ingest_behind or fail_if_not_bottommost_level is set to true,
|
||||
// then the temperature is set to the CF's last_level_temperature.
|
||||
// - Otherwise, the temperature is set to the CF's default_write_temperature.
|
||||
// (Landing in the last level does not currently guarantee using
|
||||
// last_level_temperature - TODO)
|
||||
virtual Status IngestExternalFile(
|
||||
ColumnFamilyHandle* column_family,
|
||||
const std::vector<std::string>& external_files,
|
||||
|
|
|
@ -2072,13 +2072,15 @@ struct IngestExternalFileOptions {
|
|||
// ingestion. However, if no checksum information is provided with the
|
||||
// ingested files, DB will generate the checksum and store in the Manifest.
|
||||
bool verify_file_checksum = true;
|
||||
// Set to TRUE if user wants file to be ingested to the bottommost level. An
|
||||
// Set to TRUE if user wants file to be ingested to the last level. An
|
||||
// error of Status::TryAgain() will be returned if a file cannot fit in the
|
||||
// bottommost level when calling
|
||||
// last level when calling
|
||||
// DB::IngestExternalFile()/DB::IngestExternalFiles(). The user should clear
|
||||
// the bottommost level in the overlapping range before re-attempt.
|
||||
// the last level in the overlapping range before re-attempt.
|
||||
//
|
||||
// ingest_behind takes precedence over fail_if_not_bottommost_level.
|
||||
//
|
||||
// XXX: "bottommost" is obsolete/confusing terminology to refer to last level
|
||||
bool fail_if_not_bottommost_level = false;
|
||||
};
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "advanced_options.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/table_properties.h"
|
||||
|
@ -110,7 +111,8 @@ class SstFileWriter {
|
|||
~SstFileWriter();
|
||||
|
||||
// Prepare SstFileWriter to write into file located at "file_path".
|
||||
Status Open(const std::string& file_path);
|
||||
Status Open(const std::string& file_path,
|
||||
Temperature temp = Temperature::kUnknown);
|
||||
|
||||
// Add a Put key with value to currently opened file (deprecated)
|
||||
// REQUIRES: user_key is after any previously added point (Put/Merge/Delete)
|
||||
|
|
|
@ -327,11 +327,12 @@ SstFileWriter::~SstFileWriter() {
|
|||
}
|
||||
}
|
||||
|
||||
Status SstFileWriter::Open(const std::string& file_path) {
|
||||
Status SstFileWriter::Open(const std::string& file_path, Temperature temp) {
|
||||
Rep* r = rep_.get();
|
||||
Status s;
|
||||
std::unique_ptr<FSWritableFile> sst_file;
|
||||
FileOptions cur_file_opts(r->env_options);
|
||||
cur_file_opts.temperature = temp;
|
||||
s = r->ioptions.env->GetFileSystem()->NewWritableFile(
|
||||
file_path, cur_file_opts, &sst_file, nullptr);
|
||||
if (!s.ok()) {
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
* Some enhancements and fixes to experimental Temperature handling features, including new `default_write_temperature` CF option and opening an `SstFileWriter` with a temperature.
|
|
@ -135,8 +135,9 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
|||
const Temperature temperature) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
|
||||
return CopyFile(db_->GetFileSystem(), src_dirname + "/" + fname,
|
||||
full_private_path + "/" + fname, size_limit_bytes,
|
||||
db_options.use_fsync, nullptr, temperature);
|
||||
temperature, full_private_path + "/" + fname,
|
||||
temperature, size_limit_bytes, db_options.use_fsync,
|
||||
nullptr);
|
||||
} /* copy_file_cb */,
|
||||
[&](const std::string& fname, const std::string& contents, FileType) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
|
||||
|
@ -332,9 +333,11 @@ Status CheckpointImpl::ExportColumnFamily(
|
|||
[&](const std::string& src_dirname, const std::string& fname) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s",
|
||||
cf_name.c_str(), fname.c_str());
|
||||
// FIXME: temperature handling
|
||||
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
|
||||
tmp_export_dir + fname, 0, db_options.use_fsync,
|
||||
nullptr, Temperature::kUnknown);
|
||||
Temperature::kUnknown, tmp_export_dir + fname,
|
||||
Temperature::kUnknown, 0, db_options.use_fsync,
|
||||
nullptr);
|
||||
} /*copy_file_cb*/);
|
||||
|
||||
const auto enable_status = db_->EnableFileDeletions();
|
||||
|
@ -467,4 +470,3 @@ Status CheckpointImpl::ExportFilesInMetaData(
|
|||
return s;
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
|
|
Loading…
Reference in New Issue