Add an ingestion option to not fill block cache (#13067)

Summary:
add `IngestExternalFileOptions::fill_cache` to allow users to ingest files without loading index/filter/data and other blocks into block cache during file ingestion. This can be useful when users are ingesting files into a CF that is not available to readers yet.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13067

Test Plan:
* unit test: `ExternalSSTFileTest.NoBlockCache`
* ran one round of crash test with fill_cache disabled: `python3 ./tools/db_crashtest.py --simple blackbox --ops_per_thread=1000000 --interval=30 --ingest_external_file_one_in=200 --level0_stop_writes_trigger=200 --level0_slowdown_writes_trigger=100 --sync_fault_injection=0 --disable_wal=0 --manual_wal_flush_one_in=0`

Reviewed By: jowlyzhang

Differential Revision: D64356424

Pulled By: cbi42

fbshipit-source-id: b380c26f5987238e1ed7d42ceef0390cfaa0b8e2
This commit is contained in:
Changyu Bi 2024-10-16 14:11:22 -07:00 committed by Facebook GitHub Bot
parent ecc084d301
commit 787730c859
8 changed files with 85 additions and 16 deletions

View file

@ -5829,7 +5829,6 @@ Status DBImpl::IngestExternalFile(
Status DBImpl::IngestExternalFiles( Status DBImpl::IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) { const std::vector<IngestExternalFileArg>& args) {
// TODO: plumb Env::IOActivity, Env::IOPriority // TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
const WriteOptions write_options; const WriteOptions write_options;
if (args.empty()) { if (args.empty()) {
@ -5855,6 +5854,10 @@ Status DBImpl::IngestExternalFiles(
snprintf(err_msg, 128, "external_files[%zu] is empty", i); snprintf(err_msg, 128, "external_files[%zu] is empty", i);
return Status::InvalidArgument(err_msg); return Status::InvalidArgument(err_msg);
} }
if (i && args[i].options.fill_cache != args[i - 1].options.fill_cache) {
return Status::InvalidArgument(
"fill_cache should be the same across ingestion options.");
}
} }
for (const auto& arg : args) { for (const auto& arg : args) {
const IngestExternalFileOptions& ingest_opts = arg.options; const IngestExternalFileOptions& ingest_opts = arg.options;
@ -6042,6 +6045,8 @@ Status DBImpl::IngestExternalFiles(
} }
} }
if (status.ok()) { if (status.ok()) {
ReadOptions read_options;
read_options.fill_cache = args[0].options.fill_cache;
autovector<ColumnFamilyData*> cfds_to_commit; autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list; autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists; autovector<autovector<VersionEdit*>> edit_lists;

View file

@ -567,13 +567,9 @@ void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() {
file_ingesting_compactions_.push_back(new Compaction( file_ingesting_compactions_.push_back(new Compaction(
cfd_->current()->storage_info(), *cfd_->ioptions(), mutable_cf_options, cfd_->current()->storage_info(), *cfd_->ioptions(), mutable_cf_options,
mutable_db_options_, {input}, output_level, mutable_db_options_, {input}, output_level,
MaxFileSizeForLevel( /* output file size limit not applicable */
mutable_cf_options, output_level, MaxFileSizeForLevel(mutable_cf_options, output_level,
cfd_->ioptions()->compaction_style) /* output file size cfd_->ioptions()->compaction_style),
limit,
* not applicable
*/
,
LLONG_MAX /* max compaction bytes, not applicable */, LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compression, 0 /* output path ID, not applicable */, mutable_cf_options.compression,
mutable_cf_options.compression_opts, mutable_cf_options.compression_opts,
@ -727,7 +723,10 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
new RandomAccessFileReader(std::move(sst_file), external_file, new RandomAccessFileReader(std::move(sst_file), external_file,
nullptr /*Env*/, io_tracer_)); nullptr /*Env*/, io_tracer_));
table_reader->reset(); table_reader->reset();
ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
status = cfd_->ioptions()->table_factory->NewTableReader( status = cfd_->ioptions()->table_factory->NewTableReader(
ro,
TableReaderOptions( TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor, *cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(), env_options_, cfd_->internal_comparator(),
@ -739,7 +738,9 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
/*cur_file_num*/ new_file_number, /*cur_file_num*/ new_file_number,
/* unique_id */ {}, /* largest_seqno */ 0, /* unique_id */ {}, /* largest_seqno */ 0,
/* tail_size */ 0, user_defined_timestamps_persisted), /* tail_size */ 0, user_defined_timestamps_persisted),
std::move(sst_file_reader), file_to_ingest->file_size, table_reader); std::move(sst_file_reader), file_to_ingest->file_size, table_reader,
// No need to prefetch index/filter if caching is not needed.
/*prefetch_index_and_filter_in_cache=*/ingestion_options_.fill_cache);
return status; return status;
} }
@ -885,6 +886,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
// TODO: plumb Env::IOActivity, Env::IOPriority // TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro; ReadOptions ro;
ro.readahead_size = ingestion_options_.verify_checksums_readahead_size; ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
ro.fill_cache = ingestion_options_.fill_cache;
status = table_reader->VerifyChecksum( status = table_reader->VerifyChecksum(
ro, TableReaderCaller::kExternalSSTIngestion); ro, TableReaderCaller::kExternalSSTIngestion);
if (!status.ok()) { if (!status.ok()) {
@ -895,6 +897,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ParsedInternalKey key; ParsedInternalKey key;
// TODO: plumb Env::IOActivity, Env::IOPriority // TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro; ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator( std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion)); /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
@ -1064,6 +1067,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
Arena arena; Arena arena;
// TODO: plumb Env::IOActivity, Env::IOPriority // TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro; ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
ro.total_order_seek = true; ro.total_order_seek = true;
int target_level = 0; int target_level = 0;
auto* vstorage = cfd_->current()->storage_info(); auto* vstorage = cfd_->current()->storage_info();

View file

@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include <table/block_based/block_based_table_factory.h>
#include <functional> #include <functional>
#include <memory> #include <memory>
@ -150,7 +152,7 @@ class ExternalSSTFileTest
bool verify_checksums_before_ingest = true, bool ingest_behind = false, bool verify_checksums_before_ingest = true, bool ingest_behind = false,
bool sort_data = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr, std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) { ColumnFamilyHandle* cfh = nullptr, bool fill_cache = false) {
// Generate a file id if not provided // Generate a file id if not provided
if (file_id == -1) { if (file_id == -1) {
file_id = last_file_id_ + 1; file_id = last_file_id_ + 1;
@ -194,6 +196,7 @@ class ExternalSSTFileTest
ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false; ifo.write_global_seqno = allow_global_seqno ? write_global_seqno : false;
ifo.verify_checksums_before_ingest = verify_checksums_before_ingest; ifo.verify_checksums_before_ingest = verify_checksums_before_ingest;
ifo.ingest_behind = ingest_behind; ifo.ingest_behind = ingest_behind;
ifo.fill_cache = fill_cache;
if (cfh) { if (cfh) {
s = db_->IngestExternalFile(cfh, {file_path}, ifo); s = db_->IngestExternalFile(cfh, {file_path}, ifo);
} else { } else {
@ -267,15 +270,15 @@ class ExternalSSTFileTest
bool verify_checksums_before_ingest = true, bool ingest_behind = false, bool verify_checksums_before_ingest = true, bool ingest_behind = false,
bool sort_data = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr, std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) { ColumnFamilyHandle* cfh = nullptr, bool fill_cache = false) {
std::vector<std::pair<std::string, std::string>> file_data; std::vector<std::pair<std::string, std::string>> file_data;
for (auto& k : keys) { for (auto& k : keys) {
file_data.emplace_back(Key(k), Key(k) + std::to_string(file_id)); file_data.emplace_back(Key(k), Key(k) + std::to_string(file_id));
} }
return GenerateAndAddExternalFile(options, file_data, file_id, return GenerateAndAddExternalFile(
allow_global_seqno, write_global_seqno, options, file_data, file_id, allow_global_seqno, write_global_seqno,
verify_checksums_before_ingest, verify_checksums_before_ingest, ingest_behind, sort_data, true_data,
ingest_behind, sort_data, true_data, cfh); cfh, fill_cache);
} }
Status DeprecatedAddFile(const std::vector<std::string>& files, Status DeprecatedAddFile(const std::vector<std::string>& files,
@ -314,6 +317,49 @@ TEST_F(ExternalSSTFileTest, ComparatorMismatch) {
ASSERT_NOK(DeprecatedAddFile({file})); ASSERT_NOK(DeprecatedAddFile({file}));
} }
TEST_F(ExternalSSTFileTest, NoBlockCache) {
LRUCacheOptions co;
co.capacity = 32 << 20;
std::shared_ptr<Cache> cache = NewLRUCache(co);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
table_options.cache_index_and_filter_blocks = true;
Options options = CurrentOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
size_t usage_before_ingestion = cache->GetUsage();
std::map<std::string, std::string> true_data;
// Ingest with fill_cache = true
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, false, false, true,
false, false, &true_data, nullptr,
/*fill_cache=*/true));
ASSERT_EQ(FilesPerLevel(), "0,0,0,0,0,0,1");
EXPECT_GT(cache->GetUsage(), usage_before_ingestion);
TablePropertiesCollection tp;
ASSERT_OK(db_->GetPropertiesOfAllTables(&tp));
for (const auto& entry : tp) {
EXPECT_GT(entry.second->index_size, 0);
EXPECT_GT(entry.second->filter_size, 0);
}
usage_before_ingestion = cache->GetUsage();
// Ingest with fill_cache = false
ASSERT_OK(GenerateAndAddExternalFile(options, {3, 4}, -1, false, false, true,
false, false, &true_data, nullptr,
/*fill_cache=*/false));
EXPECT_EQ(usage_before_ingestion, cache->GetUsage());
tp.clear();
ASSERT_OK(db_->GetPropertiesOfAllTables(&tp));
for (const auto& entry : tp) {
EXPECT_GT(entry.second->index_size, 0);
EXPECT_GT(entry.second->filter_size, 0);
}
}
TEST_F(ExternalSSTFileTest, Basic) { TEST_F(ExternalSSTFileTest, Basic) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();

View file

@ -2057,11 +2057,13 @@ class NonBatchedOpsStressTest : public StressTest {
ingest_options.verify_checksums_before_ingest = thread->rand.OneInOpt(2); ingest_options.verify_checksums_before_ingest = thread->rand.OneInOpt(2);
ingest_options.verify_checksums_readahead_size = ingest_options.verify_checksums_readahead_size =
thread->rand.OneInOpt(2) ? 1024 * 1024 : 0; thread->rand.OneInOpt(2) ? 1024 * 1024 : 0;
ingest_options.fill_cache = thread->rand.OneInOpt(4);
ingest_options_oss << "move_files: " << ingest_options.move_files ingest_options_oss << "move_files: " << ingest_options.move_files
<< ", verify_checksums_before_ingest: " << ", verify_checksums_before_ingest: "
<< ingest_options.verify_checksums_before_ingest << ingest_options.verify_checksums_before_ingest
<< ", verify_checksums_readahead_size: " << ", verify_checksums_readahead_size: "
<< ingest_options.verify_checksums_readahead_size; << ingest_options.verify_checksums_readahead_size
<< ", fill_cache: " << ingest_options.fill_cache;
s = db_->IngestExternalFile(column_families_[column_family], s = db_->IngestExternalFile(column_families_[column_family],
{sst_filename}, ingest_options); {sst_filename}, ingest_options);
} }

View file

@ -2271,6 +2271,14 @@ struct IngestExternalFileOptions {
// RepairDB() may not recover these files correctly, potentially leading to // RepairDB() may not recover these files correctly, potentially leading to
// data loss. // data loss.
bool allow_db_generated_files = false; bool allow_db_generated_files = false;
// Controls whether data and metadata blocks (e.g. index, filter) read during
// file ingestion will be added to block cache.
// Users may wish to set this to false when bulk loading into a CF that is not
// available for reads yet.
// When ingesting to multiple families, this option should be the same across
// ingestion options.
bool fill_cache = true;
}; };
enum TraceFilterType : uint64_t { enum TraceFilterType : uint64_t {

View file

@ -653,6 +653,7 @@ Status BlockBasedTable::Open(
ro.rate_limiter_priority = read_options.rate_limiter_priority; ro.rate_limiter_priority = read_options.rate_limiter_priority;
ro.verify_checksums = read_options.verify_checksums; ro.verify_checksums = read_options.verify_checksums;
ro.io_activity = read_options.io_activity; ro.io_activity = read_options.io_activity;
ro.fill_cache = read_options.fill_cache;
// prefetch both index and filters, down to all partitions // prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;

View file

@ -500,6 +500,8 @@ class BlockBasedTable : public TableReader {
InternalIterator* meta_iter, InternalIterator* meta_iter,
const InternalKeyComparator& internal_comparator, const InternalKeyComparator& internal_comparator,
BlockCacheLookupContext* lookup_context); BlockCacheLookupContext* lookup_context);
// If index and filter blocks do not need to be pinned, `prefetch_all`
// determines whether they will be read and add to cache.
Status PrefetchIndexAndFilterBlocks( Status PrefetchIndexAndFilterBlocks(
const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter, BlockBasedTable* new_table, InternalIterator* meta_iter, BlockBasedTable* new_table,

View file

@ -0,0 +1 @@
* Add a new file ingestion option `IngestExternalFileOptions::fill_cache` to support not adding blocks from ingested files into block cache during file ingestion.