Refactor `table_factory` into MutableCFOptions (#13077)

Summary:
This is setting up for a fix to a data race in SetOptions on BlockBasedTableOptions (BBTO), https://github.com/facebook/rocksdb/issues/10079
The race will be fixed by replacing `table_factory` with a modified copy whenever we want to modify a BBTO field.

An argument could be made that this change creates more entaglement between features (e.g. BlobSource <-> MutableCFOptions), rather than (conceptually) minimizing the dependencies of each feature, but
* Most of these things already depended on ImmutableOptions
* Historically there has been a lot of plumbing (and possible small CPU overhead) involved in adding features that need to reach a lot of places, like `block_protection_bytes_per_key`. Keeping those wrapped up in options simplifies that.
* SuperVersion management generally takes care of lifetime management of MutableCFOptions, so is not that difficult. (Crash test agrees so far.)

There are some FIXME places where it is known to be unsafe to replace `block_cache` unless/until we handle shared_ptr tracking properly. HOWEVER, replacing `block_cache` is generally dubious, at least while existing users of the old block cache (e.g. table readers) can continue indefinitely.

The change to cf_options.cc is essentially just moving code (not changing).

I'm not concerned about the performance of copying another shared_ptr with MutableCFOptions, but I left a note about considering an improvement if more shared_ptr are added to it.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13077

Test Plan:
existing tests, crash test.

Unit test DBOptionsTest.GetLatestCFOptions updated with some temporary logic. MemoryTest required some refactoring (simplification) for the change.

Reviewed By: cbi42

Differential Revision: D64546903

Pulled By: pdillinger

fbshipit-source-id: 69ae97ce5cf4c01b58edc4c5d4687eb1e5bf5855
This commit is contained in:
Peter Dillinger 2024-10-17 14:13:20 -07:00 committed by Facebook GitHub Bot
parent a44f4b8653
commit ac24f152a1
36 changed files with 399 additions and 450 deletions

View File

@ -20,23 +20,24 @@
namespace ROCKSDB_NAMESPACE {
BlobSource::BlobSource(const ImmutableOptions* immutable_options,
BlobSource::BlobSource(const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const std::string& db_id,
const std::string& db_session_id,
BlobFileCache* blob_file_cache)
: db_id_(db_id),
db_session_id_(db_session_id),
statistics_(immutable_options->statistics.get()),
statistics_(immutable_options.statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache),
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {
blob_cache_(immutable_options.blob_cache),
lowest_used_cache_tier_(immutable_options.lowest_used_cache_tier) {
auto bbto =
immutable_options->table_factory->GetOptions<BlockBasedTableOptions>();
mutable_cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto &&
bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
.charged == CacheEntryRoleOptions::Decision::kEnabled) {
blob_cache_ = SharedCacheInterface{std::make_shared<ChargedCache>(
immutable_options->blob_cache, bbto->block_cache)};
immutable_options.blob_cache, bbto->block_cache)};
}
}

View File

@ -21,6 +21,7 @@
namespace ROCKSDB_NAMESPACE {
struct ImmutableOptions;
struct MutableCFOptions;
class Status;
class FilePrefetchBuffer;
class Slice;
@ -31,7 +32,10 @@ class Slice;
// storage with minimal cost.
class BlobSource {
public:
BlobSource(const ImmutableOptions* immutable_options,
// NOTE: db_id, db_session_id, and blob_file_cache are saved by reference or
// pointer.
BlobSource(const ImmutableOptions& immutable_options,
const MutableCFOptions& mutable_cf_options,
const std::string& db_id, const std::string& db_session_id,
BlobFileCache* blob_file_cache);

View File

@ -148,6 +148,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
@ -193,8 +194,8 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());
ReadOptions read_options;
read_options.verify_checksums = true;
@ -464,6 +465,7 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) {
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
@ -498,8 +500,8 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, nullptr /*HistogramImpl*/, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());
ReadOptions read_options;
read_options.verify_checksums = true;
@ -589,6 +591,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) {
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
@ -644,8 +647,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());
ReadOptions read_options;
read_options.verify_checksums = true;
@ -782,6 +785,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
@ -827,8 +831,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());
ReadOptions read_options;
read_options.verify_checksums = true;
@ -1105,6 +1109,7 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
@ -1137,8 +1142,8 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());
CacheHandleGuard<BlobFileReader> file_reader;
ReadOptions read_options;
@ -1405,6 +1410,7 @@ TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);
constexpr ExpirationRange expiration_range;
@ -1426,8 +1432,8 @@ TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
backing_cache.get(), &immutable_options, &file_options,
kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());
ConcurrentCacheReservationManager* cache_res_mgr =
static_cast<ChargedCache*>(blob_source.GetBlobCache())
@ -1519,6 +1525,8 @@ TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservation) {
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
MutableCFOptions mutable_cf_options(options_);
constexpr size_t blob_size = 24 << 10; // 24KB
for (size_t i = 0; i < kNumBlobs; ++i) {
blob_file_size_ -= blobs_[i].size(); // old blob size
@ -1546,8 +1554,8 @@ TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservation) {
backing_cache.get(), &immutable_options, &file_options,
kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
BlobSource blob_source(immutable_options, mutable_cf_options, db_id_,
db_session_id_, blob_file_cache.get());
ConcurrentCacheReservationManager* cache_res_mgr =
static_cast<ChargedCache*>(blob_source.GetBlobCache())

View File

@ -53,7 +53,7 @@ TableBuilder* NewTableBuilder(const TableBuilderOptions& tboptions,
assert((tboptions.column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
tboptions.column_family_name.empty());
return tboptions.ioptions.table_factory->NewTableBuilder(tboptions, file);
return tboptions.moptions.table_factory->NewTableBuilder(tboptions, file);
}
Status BuildTable(
@ -420,8 +420,7 @@ Status BuildTable(
// the goal is to cache it here for further user reads.
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
tboptions.read_options, file_options, tboptions.internal_comparator,
*meta, nullptr /* range_del_agg */,
mutable_cf_options.prefix_extractor, nullptr,
*meta, nullptr /* range_del_agg */, mutable_cf_options, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
@ -429,8 +428,7 @@ Status BuildTable(
MaxFileSizeForL0MetaPin(mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key*/ nullptr,
/*allow_unprepared_value*/ false,
mutable_cf_options.block_protection_bytes_per_key));
/*allow_unprepared_value*/ false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
OutputValidator file_validator(tboptions.internal_comparator,

View File

@ -595,8 +595,8 @@ ColumnFamilyData::ColumnFamilyData(
blob_file_cache_.reset(
new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
internal_stats_->GetBlobFileReadHist(), io_tracer));
blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id,
blob_file_cache_.get()));
blob_source_.reset(new BlobSource(ioptions_, mutable_cf_options_, db_id,
db_session_id, blob_file_cache_.get()));
if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(

View File

@ -865,7 +865,7 @@ bool Compaction::ShouldFormSubcompactions() const {
return false;
}
if (cfd_->ioptions()->table_factory->Name() ==
if (mutable_cf_options_.table_factory->Name() ==
TableFactory::kPlainTableName()) {
return false;
}

View File

@ -469,7 +469,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
ReadOptions read_options(Env::IOActivity::kCompaction);
read_options.rate_limiter_priority = GetRateLimiterPriority();
auto* c = compact_->compaction;
if (c->immutable_options()->table_factory->Name() ==
if (c->mutable_cf_options()->table_factory->Name() ==
TableFactory::kPlainTableName()) {
return;
}
@ -506,9 +506,7 @@ void CompactionJob::GenSubcompactionBoundaries() {
FileMetaData* f = flevel->files[i].file_metadata;
std::vector<TableReader::Anchor> my_anchors;
Status s = cfd->table_cache()->ApproximateKeyAnchors(
read_options, icomp, *f,
c->mutable_cf_options()->block_protection_bytes_per_key,
my_anchors);
read_options, icomp, *f, *c->mutable_cf_options(), my_anchors);
if (!s.ok() || my_anchors.empty()) {
my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
}
@ -711,8 +709,6 @@ Status CompactionJob::Run() {
}
}
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto& prefix_extractor =
compact_->compaction->mutable_cf_options()->prefix_extractor;
std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
while (true) {
@ -733,7 +729,8 @@ Status CompactionJob::Run() {
InternalIterator* iter = cfd->table_cache()->NewIterator(
verify_table_read_options, file_options_,
cfd->internal_comparator(), files_output[file_idx]->meta,
/*range_del_agg=*/nullptr, prefix_extractor,
/*range_del_agg=*/nullptr,
*compact_->compaction->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
@ -743,9 +740,7 @@ Status CompactionJob::Run() {
*compact_->compaction->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false,
compact_->compaction->mutable_cf_options()
->block_protection_bytes_per_key);
/*allow_unprepared_value=*/false);
auto s = iter->status();
if (s.ok() && paranoid_file_checks_) {

View File

@ -250,6 +250,7 @@ class CompactionJobTestBase : public testing::Test {
} else {
assert(false);
}
mutable_cf_options_.table_factory = cf_options_.table_factory;
}
std::string GenerateFileName(uint64_t file_number) {

View File

@ -87,7 +87,7 @@ Status VerifySstFileChecksumInternal(const Options& options,
options.block_protection_bytes_per_key, false /* skip_filters */,
!kImmortal, false /* force_direct_prefetch */, -1 /* level */);
reader_options.largest_seqno = largest_seqno;
s = ioptions.table_factory->NewTableReader(
s = options.table_factory->NewTableReader(
read_options, reader_options, std::move(file_reader), file_size,
&table_reader, false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) {

View File

@ -1153,6 +1153,13 @@ void DBImpl::DumpStats() {
continue;
}
auto* table_factory =
cfd->GetCurrentMutableCFOptions()->table_factory.get();
assert(table_factory != nullptr);
// FIXME: need to a shared_ptr if/when block_cache is going to be mutable
Cache* cache =
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
// Release DB mutex for gathering cache entry stats. Pass over all
// column families for this first so that other stats are dumped
// near-atomically.
@ -1161,10 +1168,6 @@ void DBImpl::DumpStats() {
// Probe block cache for problems (if not already via another CF)
if (immutable_db_options_.info_log) {
auto* table_factory = cfd->ioptions()->table_factory.get();
assert(table_factory != nullptr);
Cache* cache =
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
if (cache && probed_caches.insert(cache).second) {
cache->ReportProblems(immutable_db_options_.info_log);
}

View File

@ -1199,9 +1199,7 @@ class DBImpl : public DB {
uint64_t TEST_total_log_size() const { return total_log_size_; }
// Returns column family name to ImmutableCFOptions map.
Status TEST_GetAllImmutableCFOptions(
std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
void TEST_GetAllBlockCaches(std::unordered_set<const Cache*>* cache_set);
// Return the lastest MutableCFOptions of a column family
Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,

View File

@ -232,23 +232,16 @@ uint64_t DBImpl::TEST_LogfileNumber() {
return logfile_number_;
}
Status DBImpl::TEST_GetAllImmutableCFOptions(
std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map) {
std::vector<std::string> cf_names;
std::vector<const ImmutableCFOptions*> iopts;
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
cf_names.push_back(cfd->GetName());
iopts.push_back(cfd->ioptions());
void DBImpl::TEST_GetAllBlockCaches(
std::unordered_set<const Cache*>* cache_set) {
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (const auto bbto =
cfd->GetCurrentMutableCFOptions()
->table_factory->GetOptions<BlockBasedTableOptions>()) {
cache_set->insert(bbto->block_cache.get());
}
}
iopts_map->clear();
for (size_t i = 0; i < cf_names.size(); ++i) {
iopts_map->insert({cf_names[i], iopts[i]});
}
return Status::OK();
}
uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {

View File

@ -186,6 +186,14 @@ TEST_F(DBOptionsTest, GetLatestDBOptions) {
ASSERT_EQ(new_options, GetMutableDBOptionsMap(dbfull()->GetDBOptions()));
}
// FIXME osolete when table_factory is mutable
static std::unordered_map<std::string, std::string> WithoutTableFactory(
const std::unordered_map<std::string, std::string>& opts) {
auto opts_copy = opts;
opts_copy.erase("table_factory");
return opts_copy;
}
TEST_F(DBOptionsTest, GetLatestCFOptions) {
// GetOptions should be able to get latest option changed by SetOptions.
Options options;
@ -195,14 +203,16 @@ TEST_F(DBOptionsTest, GetLatestCFOptions) {
Reopen(options);
CreateColumnFamilies({"foo"}, options);
ReopenWithColumnFamilies({"default", "foo"}, options);
auto options_default = GetRandomizedMutableCFOptionsMap(&rnd);
auto options_foo = GetRandomizedMutableCFOptionsMap(&rnd);
auto options_default =
WithoutTableFactory(GetRandomizedMutableCFOptionsMap(&rnd));
auto options_foo =
WithoutTableFactory(GetRandomizedMutableCFOptionsMap(&rnd));
ASSERT_OK(dbfull()->SetOptions(handles_[0], options_default));
ASSERT_OK(dbfull()->SetOptions(handles_[1], options_foo));
ASSERT_EQ(options_default,
GetMutableCFOptionsMap(dbfull()->GetOptions(handles_[0])));
ASSERT_EQ(options_foo,
GetMutableCFOptionsMap(dbfull()->GetOptions(handles_[1])));
ASSERT_EQ(options_default, WithoutTableFactory(GetMutableCFOptionsMap(
dbfull()->GetOptions(handles_[0]))));
ASSERT_EQ(options_foo, WithoutTableFactory(GetMutableCFOptionsMap(
dbfull()->GetOptions(handles_[1]))));
}
TEST_F(DBOptionsTest, SetMutableTableOptions) {

View File

@ -725,7 +725,7 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
table_reader->reset();
ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
status = cfd_->ioptions()->table_factory->NewTableReader(
status = sv->mutable_cf_options.table_factory->NewTableReader(
ro,
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
@ -920,7 +920,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
file_to_ingest->smallest_internal_key.SetFrom(key);
Slice largest;
if (strcmp(cfd_->ioptions()->table_factory->Name(), "PlainTable") == 0) {
if (strcmp(sv->mutable_cf_options.table_factory->Name(), "PlainTable") ==
0) {
// PlainTable iterator does not support SeekToLast().
largest = iter->key();
for (; iter->Valid(); iter->Next()) {

View File

@ -32,11 +32,11 @@ namespace ROCKSDB_NAMESPACE {
// iter.Next()
class ForwardLevelIterator : public InternalIterator {
public:
ForwardLevelIterator(
const ColumnFamilyData* const cfd, const ReadOptions& read_options,
const std::vector<FileMetaData*>& files,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
bool allow_unprepared_value, uint8_t block_protection_bytes_per_key)
ForwardLevelIterator(const ColumnFamilyData* const cfd,
const ReadOptions& read_options,
const std::vector<FileMetaData*>& files,
const MutableCFOptions& mutable_cf_options,
bool allow_unprepared_value)
: cfd_(cfd),
read_options_(read_options),
files_(files),
@ -44,9 +44,8 @@ class ForwardLevelIterator : public InternalIterator {
file_index_(std::numeric_limits<uint32_t>::max()),
file_iter_(nullptr),
pinned_iters_mgr_(nullptr),
prefix_extractor_(prefix_extractor),
allow_unprepared_value_(allow_unprepared_value),
block_protection_bytes_per_key_(block_protection_bytes_per_key) {
mutable_cf_options_(mutable_cf_options),
allow_unprepared_value_(allow_unprepared_value) {
status_.PermitUncheckedError(); // Allow uninitialized status through
}
@ -83,13 +82,12 @@ class ForwardLevelIterator : public InternalIterator {
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
*files_[file_index_],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
prefix_extractor_, /*table_reader_ptr=*/nullptr,
mutable_cf_options_, /*table_reader_ptr=*/nullptr,
/*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
/*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
/*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
block_protection_bytes_per_key_);
/*largest_compaction_key=*/nullptr, allow_unprepared_value_);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
valid_ = false;
if (!range_del_agg.IsEmpty()) {
@ -214,10 +212,9 @@ class ForwardLevelIterator : public InternalIterator {
Status status_;
InternalIterator* file_iter_;
PinnedIteratorsManager* pinned_iters_mgr_;
// Kept alive by ForwardIterator::sv_->mutable_cf_options
const std::shared_ptr<const SliceTransform>& prefix_extractor_;
const MutableCFOptions& mutable_cf_options_;
const bool allow_unprepared_value_;
const uint8_t block_protection_bytes_per_key_;
};
ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
@ -750,14 +747,13 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
sv_->mutable_cf_options.prefix_extractor,
sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
sv_->mutable_cf_options.block_protection_bytes_per_key));
/*largest_compaction_key=*/nullptr, allow_unprepared_value_));
}
BuildLevelIterators(vstorage, sv_);
current_ = nullptr;
@ -838,14 +834,13 @@ void ForwardIterator::RenewIterators() {
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files_new[inew],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
svnew->mutable_cf_options.prefix_extractor,
svnew->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
svnew->mutable_cf_options.block_protection_bytes_per_key));
/*largest_compaction_key=*/nullptr, allow_unprepared_value_));
}
for (auto* f : l0_iters_) {
@ -888,9 +883,8 @@ void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
}
} else {
level_iters_.push_back(new ForwardLevelIterator(
cfd_, read_options_, level_files,
sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_,
sv->mutable_cf_options.block_protection_bytes_per_key));
cfd_, read_options_, level_files, sv->mutable_cf_options,
allow_unprepared_value_));
}
}
}
@ -905,15 +899,13 @@ void ForwardIterator::ResetIncompleteIterators() {
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files[i], /*range_del_agg=*/nullptr,
sv_->mutable_cf_options.prefix_extractor,
*l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
sv_->mutable_cf_options.block_protection_bytes_per_key);
/*largest_compaction_key=*/nullptr, allow_unprepared_value_);
l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
}

View File

@ -329,7 +329,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
// TODO(yuzhangyu): User-defined timestamps doesn't support importing column
// family. Pass in the correct `user_defined_timestamps_persisted` flag for
// creating `TableReaderOptions` when the support is there.
status = cfd_->ioptions()->table_factory->NewTableReader(
status = sv->mutable_cf_options.table_factory->NewTableReader(
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
@ -371,7 +371,8 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
if (iter->Valid()) {
file_to_import->smallest_internal_key.DecodeFrom(iter->key());
Slice largest;
if (strcmp(cfd_->ioptions()->table_factory->Name(), "PlainTable") == 0) {
if (strcmp(sv->mutable_cf_options.table_factory->Name(), "PlainTable") ==
0) {
// PlainTable iterator does not support SeekToLast().
largest = iter->key();
for (; iter->Valid(); iter->Next()) {

View File

@ -1495,8 +1495,10 @@ bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,
}
Cache* InternalStats::GetBlockCacheForStats() {
auto* table_factory = cfd_->ioptions()->table_factory.get();
// NOTE: called in startup before GetCurrentMutableCFOptions() is ready
auto* table_factory = cfd_->GetLatestMutableCFOptions()->table_factory.get();
assert(table_factory != nullptr);
// FIXME: need to a shared_ptr if/when block_cache is going to be mutable
return table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
}
@ -2161,7 +2163,8 @@ class BlockCachePropertyAggregator : public IntPropertyAggregator {
virtual ~BlockCachePropertyAggregator() override = default;
void Add(ColumnFamilyData* cfd, uint64_t value) override {
auto* table_factory = cfd->ioptions()->table_factory.get();
auto* table_factory =
cfd->GetCurrentMutableCFOptions()->table_factory.get();
assert(table_factory != nullptr);
Cache* cache =
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());

View File

@ -105,6 +105,7 @@ class Repairer {
SanitizeOptions(immutable_db_options_, default_cf_opts)),
default_iopts_(
ImmutableOptions(immutable_db_options_, default_cf_opts_)),
default_mopts_(MutableCFOptions(default_cf_opts_)),
unknown_cf_opts_(
SanitizeOptions(immutable_db_options_, unknown_cf_opts)),
create_unknown_cfs_(create_unknown_cfs),
@ -261,6 +262,7 @@ class Repairer {
const InternalKeyComparator icmp_;
const ColumnFamilyOptions default_cf_opts_;
const ImmutableOptions default_iopts_; // table_cache_ holds reference
const MutableCFOptions default_mopts_;
const ColumnFamilyOptions unknown_cf_opts_;
const bool create_unknown_cfs_;
std::shared_ptr<Cache> raw_table_cache_;
@ -537,8 +539,7 @@ class Repairer {
// TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
status = table_cache_->GetTableProperties(
file_options_, read_options, icmp_, t->meta, &props,
0 /* block_protection_bytes_per_key */);
file_options_, read_options, icmp_, t->meta, &props, default_mopts_);
}
if (status.ok()) {
auto s =
@ -602,15 +603,13 @@ class Repairer {
ropts.total_order_seek = true;
InternalIterator* iter = table_cache_->NewIterator(
ropts, file_options_, cfd->internal_comparator(), t->meta,
nullptr /* range_del_agg */,
cfd->GetLatestMutableCFOptions()->prefix_extractor,
nullptr /* range_del_agg */, *cfd->GetLatestMutableCFOptions(),
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
/*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false,
cfd->GetLatestMutableCFOptions()->block_protection_bytes_per_key);
/*allow_unprepared_value=*/false);
ParsedInternalKey parsed;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
@ -651,8 +650,7 @@ class Repairer {
std::unique_ptr<FragmentedRangeTombstoneIterator> r_iter;
status = table_cache_->GetRangeTombstoneIterator(
ropts, cfd->internal_comparator(), t->meta,
cfd->GetLatestMutableCFOptions()->block_protection_bytes_per_key,
&r_iter);
*cfd->GetLatestMutableCFOptions(), &r_iter);
if (r_iter) {
r_iter->SeekToFirst();

View File

@ -91,10 +91,9 @@ Status TableCache::GetTableReader(
const ReadOptions& ro, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, bool sequential_mode,
uint8_t block_protection_bytes_per_key, HistogramImpl* file_read_hist,
std::unique_ptr<TableReader>* table_reader,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
HistogramImpl* file_read_hist, std::unique_ptr<TableReader>* table_reader,
const MutableCFOptions& mutable_cf_options, bool skip_filters, int level,
bool prefetch_index_and_filter_in_cache,
size_t max_file_size_for_l0_meta_pin, Temperature file_temperature) {
std::string fname = TableFileName(
ioptions_.cf_paths, file_meta.fd.GetNumber(), file_meta.fd.GetPathId());
@ -146,13 +145,14 @@ Status TableCache::GetTableReader(
} else {
expected_unique_id = kNullUniqueId64x2; // null ID == no verification
}
s = ioptions_.table_factory->NewTableReader(
s = mutable_cf_options.table_factory->NewTableReader(
ro,
TableReaderOptions(
ioptions_, prefix_extractor, file_options, internal_comparator,
block_protection_bytes_per_key, skip_filters, immortal_tables_,
false /* force_direct_prefetch */, level, block_cache_tracer_,
max_file_size_for_l0_meta_pin, db_session_id_,
ioptions_, mutable_cf_options.prefix_extractor, file_options,
internal_comparator,
mutable_cf_options.block_protection_bytes_per_key, skip_filters,
immortal_tables_, false /* force_direct_prefetch */, level,
block_cache_tracer_, max_file_size_for_l0_meta_pin, db_session_id_,
file_meta.fd.GetNumber(), expected_unique_id,
file_meta.fd.largest_seqno, file_meta.tail_size,
file_meta.user_defined_timestamps_persisted),
@ -172,10 +172,9 @@ Status TableCache::FindTable(
const ReadOptions& ro, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, TypedHandle** handle,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
const bool no_io, HistogramImpl* file_read_hist, bool skip_filters,
int level, bool prefetch_index_and_filter_in_cache,
const MutableCFOptions& mutable_cf_options, const bool no_io,
HistogramImpl* file_read_hist, bool skip_filters, int level,
bool prefetch_index_and_filter_in_cache,
size_t max_file_size_for_l0_meta_pin, Temperature file_temperature) {
PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos, ioptions_.clock);
uint64_t number = file_meta.fd.GetNumber();
@ -197,9 +196,8 @@ Status TableCache::FindTable(
std::unique_ptr<TableReader> table_reader;
Status s = GetTableReader(ro, file_options, internal_comparator, file_meta,
false /* sequential mode */,
block_protection_bytes_per_key, file_read_hist,
&table_reader, prefix_extractor, skip_filters,
false /* sequential mode */, file_read_hist,
&table_reader, mutable_cf_options, skip_filters,
level, prefetch_index_and_filter_in_cache,
max_file_size_for_l0_meta_pin, file_temperature);
if (!s.ok()) {
@ -223,13 +221,12 @@ InternalIterator* TableCache::NewIterator(
const ReadOptions& options, const FileOptions& file_options,
const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
RangeDelAggregator* range_del_agg,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
size_t max_file_size_for_l0_meta_pin,
const MutableCFOptions& mutable_cf_options, TableReader** table_reader_ptr,
HistogramImpl* file_read_hist, TableReaderCaller caller, Arena* arena,
bool skip_filters, int level, size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
uint8_t block_protection_bytes_per_key, const SequenceNumber* read_seqno,
const SequenceNumber* read_seqno,
std::unique_ptr<TruncatedRangeDelIterator>* range_del_iter) {
PERF_TIMER_GUARD(new_table_iterator_nanos);
@ -244,7 +241,7 @@ InternalIterator* TableCache::NewIterator(
table_reader = fd.table_reader;
if (table_reader == nullptr) {
s = FindTable(options, file_options, icomparator, file_meta, &handle,
block_protection_bytes_per_key, prefix_extractor,
mutable_cf_options,
options.read_tier == kBlockCacheTier /* no_io */,
file_read_hist, skip_filters, level,
true /* prefetch_index_and_filter_in_cache */,
@ -260,8 +257,9 @@ InternalIterator* TableCache::NewIterator(
result = NewEmptyInternalIterator<Slice>(arena);
} else {
result = table_reader->NewIterator(
options, prefix_extractor.get(), arena, skip_filters, caller,
file_options.compaction_readahead_size, allow_unprepared_value);
options, mutable_cf_options.prefix_extractor.get(), arena,
skip_filters, caller, file_options.compaction_readahead_size,
allow_unprepared_value);
}
if (handle != nullptr) {
cache_.RegisterReleaseAsCleanup(handle, *result);
@ -328,7 +326,7 @@ InternalIterator* TableCache::NewIterator(
Status TableCache::GetRangeTombstoneIterator(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, uint8_t block_protection_bytes_per_key,
const FileMetaData& file_meta, const MutableCFOptions& mutable_cf_options,
std::unique_ptr<FragmentedRangeTombstoneIterator>* out_iter) {
assert(out_iter);
const FileDescriptor& fd = file_meta.fd;
@ -337,7 +335,7 @@ Status TableCache::GetRangeTombstoneIterator(
TypedHandle* handle = nullptr;
if (t == nullptr) {
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, block_protection_bytes_per_key);
&handle, mutable_cf_options);
if (s.ok()) {
t = cache_.Value(handle);
}
@ -429,14 +427,13 @@ bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
return found;
}
Status TableCache::Get(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k, GetContext* get_context,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters, int level,
size_t max_file_size_for_l0_meta_pin) {
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context,
const MutableCFOptions& mutable_cf_options,
HistogramImpl* file_read_hist, bool skip_filters,
int level, size_t max_file_size_for_l0_meta_pin) {
auto& fd = file_meta.fd;
std::string* row_cache_entry = nullptr;
bool done = false;
@ -461,7 +458,7 @@ Status TableCache::Get(
if (s.ok() && !done) {
if (t == nullptr) {
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, block_protection_bytes_per_key, prefix_extractor,
&handle, mutable_cf_options,
options.read_tier == kBlockCacheTier /* no_io */,
file_read_hist, skip_filters, level,
true /* prefetch_index_and_filter_in_cache */,
@ -490,7 +487,8 @@ Status TableCache::Get(
}
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context, prefix_extractor.get(), skip_filters);
s = t->Get(options, k, get_context,
mutable_cf_options.prefix_extractor.get(), skip_filters);
get_context->SetReplayLog(nullptr);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find table in cache and couldn't open it because of no_io.
@ -543,11 +541,9 @@ void TableCache::UpdateRangeTombstoneSeqnums(
Status TableCache::MultiGetFilter(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
const FileMetaData& file_meta, const MutableCFOptions& mutable_cf_options,
HistogramImpl* file_read_hist, int level,
MultiGetContext::Range* mget_range, TypedHandle** table_handle,
uint8_t block_protection_bytes_per_key) {
MultiGetContext::Range* mget_range, TypedHandle** table_handle) {
auto& fd = file_meta.fd;
IterKey row_cache_key;
std::string row_cache_entry_buffer;
@ -566,7 +562,7 @@ Status TableCache::MultiGetFilter(
mget_range->end());
if (t == nullptr) {
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, block_protection_bytes_per_key, prefix_extractor,
&handle, mutable_cf_options,
options.read_tier == kBlockCacheTier /* no_io */,
file_read_hist,
/*skip_filters=*/false, level,
@ -578,7 +574,8 @@ Status TableCache::MultiGetFilter(
*table_handle = handle;
}
if (s.ok()) {
s = t->MultiGetFilter(options, prefix_extractor.get(), mget_range);
s = t->MultiGetFilter(options, mutable_cf_options.prefix_extractor.get(),
mget_range);
}
if (s.ok() && !options.ignore_range_deletions) {
// Update the range tombstone sequence numbers for the keys here
@ -599,8 +596,7 @@ Status TableCache::GetTableProperties(
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
std::shared_ptr<const TableProperties>* properties,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor, bool no_io) {
const MutableCFOptions& mutable_cf_options, bool no_io) {
auto table_reader = file_meta.fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
@ -611,8 +607,7 @@ Status TableCache::GetTableProperties(
TypedHandle* table_handle = nullptr;
Status s = FindTable(read_options, file_options, internal_comparator,
file_meta, &table_handle, block_protection_bytes_per_key,
prefix_extractor, no_io);
file_meta, &table_handle, mutable_cf_options, no_io);
if (!s.ok()) {
return s;
}
@ -625,14 +620,15 @@ Status TableCache::GetTableProperties(
Status TableCache::ApproximateKeyAnchors(
const ReadOptions& ro, const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, uint8_t block_protection_bytes_per_key,
const FileMetaData& file_meta, const MutableCFOptions& mutable_cf_options,
std::vector<TableReader::Anchor>& anchors) {
Status s;
TableReader* t = file_meta.fd.table_reader;
TypedHandle* handle = nullptr;
if (t == nullptr) {
s = FindTable(ro, file_options_, internal_comparator, file_meta, &handle,
block_protection_bytes_per_key);
mutable_cf_options);
if (s.ok()) {
t = cache_.Value(handle);
}
@ -649,8 +645,7 @@ Status TableCache::ApproximateKeyAnchors(
size_t TableCache::GetMemoryUsageByTableReader(
const FileOptions& file_options, const ReadOptions& read_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor) {
const FileMetaData& file_meta, const MutableCFOptions& mutable_cf_options) {
auto table_reader = file_meta.fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
@ -658,9 +653,9 @@ size_t TableCache::GetMemoryUsageByTableReader(
}
TypedHandle* table_handle = nullptr;
Status s = FindTable(read_options, file_options, internal_comparator,
file_meta, &table_handle, block_protection_bytes_per_key,
prefix_extractor, true /* no_io */);
Status s =
FindTable(read_options, file_options, internal_comparator, file_meta,
&table_handle, mutable_cf_options, true /* no_io */);
if (!s.ok()) {
return 0;
}
@ -679,16 +674,14 @@ uint64_t TableCache::ApproximateOffsetOf(
const ReadOptions& read_options, const Slice& key,
const FileMetaData& file_meta, TableReaderCaller caller,
const InternalKeyComparator& internal_comparator,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor) {
const MutableCFOptions& mutable_cf_options) {
uint64_t result = 0;
TableReader* table_reader = file_meta.fd.table_reader;
TypedHandle* table_handle = nullptr;
if (table_reader == nullptr) {
Status s =
FindTable(read_options, file_options_, internal_comparator, file_meta,
&table_handle, block_protection_bytes_per_key,
prefix_extractor, false /* no_io */);
&table_handle, mutable_cf_options, false /* no_io */);
if (s.ok()) {
table_reader = cache_.Value(table_handle);
}
@ -708,16 +701,14 @@ uint64_t TableCache::ApproximateSize(
const ReadOptions& read_options, const Slice& start, const Slice& end,
const FileMetaData& file_meta, TableReaderCaller caller,
const InternalKeyComparator& internal_comparator,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor) {
const MutableCFOptions& mutable_cf_options) {
uint64_t result = 0;
TableReader* table_reader = file_meta.fd.table_reader;
TypedHandle* table_handle = nullptr;
if (table_reader == nullptr) {
Status s =
FindTable(read_options, file_options_, internal_comparator, file_meta,
&table_handle, block_protection_bytes_per_key,
prefix_extractor, false /* no_io */);
&table_handle, mutable_cf_options, false /* no_io */);
if (s.ok()) {
table_reader = cache_.Value(table_handle);
}

View File

@ -92,13 +92,12 @@ class TableCache {
const ReadOptions& options, const FileOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, RangeDelAggregator* range_del_agg,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
const MutableCFOptions& mutable_cf_options,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
uint8_t protection_bytes_per_key,
const SequenceNumber* range_del_read_seqno = nullptr,
std::unique_ptr<TruncatedRangeDelIterator>* range_del_iter = nullptr);
@ -112,21 +111,20 @@ class TableCache {
// recorded
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
Status Get(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k, GetContext* get_context,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
int level = -1, size_t max_file_size_for_l0_meta_pin = 0);
Status Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context,
const MutableCFOptions& mutable_cf_options,
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
int level = -1, size_t max_file_size_for_l0_meta_pin = 0);
// Return the range delete tombstone iterator of the file specified by
// `file_meta`.
Status GetRangeTombstoneIterator(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, uint8_t block_protection_bytes_per_key,
const FileMetaData& file_meta, const MutableCFOptions& mutable_cf_options,
std::unique_ptr<FragmentedRangeTombstoneIterator>* out_iter);
// Call table reader's MultiGetFilter to use the bloom filter to filter out
@ -134,14 +132,13 @@ class TableCache {
// If the table cache is looked up to get the table reader, the cache handle
// is returned in table_handle. This handle should be passed back to
// MultiGet() so it can be released.
Status MultiGetFilter(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
HistogramImpl* file_read_hist, int level,
MultiGetContext::Range* mget_range, TypedHandle** table_handle,
uint8_t block_protection_bytes_per_key);
Status MultiGetFilter(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const MutableCFOptions& mutable_cf_options,
HistogramImpl* file_read_hist, int level,
MultiGetContext::Range* mget_range,
TypedHandle** table_handle);
// If a seek to internal key "k" in specified file finds an entry,
// call get_context->SaveValue() repeatedly until
@ -152,15 +149,15 @@ class TableCache {
// in the embedded GetContext
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
DECLARE_SYNC_AND_ASYNC(
Status, MultiGet, const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
bool skip_range_deletions = false, int level = -1,
TypedHandle* table_handle = nullptr);
DECLARE_SYNC_AND_ASYNC(Status, MultiGet, const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const MultiGetContext::Range* mget_range,
const MutableCFOptions& mutable_cf_options,
HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false,
bool skip_range_deletions = false, int level = -1,
TypedHandle* table_handle = nullptr);
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);
@ -176,17 +173,16 @@ class TableCache {
// Find table reader
// @param skip_filters Disables loading/accessing the filter block
// @param level == -1 means not specified
Status FindTable(
const ReadOptions& ro, const FileOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, TypedHandle**,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
const bool no_io = false, HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false, int level = -1,
bool prefetch_index_and_filter_in_cache = true,
size_t max_file_size_for_l0_meta_pin = 0,
Temperature file_temperature = Temperature::kUnknown);
Status FindTable(const ReadOptions& ro, const FileOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, TypedHandle**,
const MutableCFOptions& mutable_cf_options,
const bool no_io = false,
HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false, int level = -1,
bool prefetch_index_and_filter_in_cache = true,
size_t max_file_size_for_l0_meta_pin = 0,
Temperature file_temperature = Temperature::kUnknown);
// Get the table properties of a given table.
// @no_io: indicates if we should load table to the cache if it is not present
@ -194,19 +190,18 @@ class TableCache {
// @returns: `properties` will be reset on success. Please note that we will
// return Status::Incomplete() if table is not present in cache and
// we set `no_io` to be true.
Status GetTableProperties(
const FileOptions& toptions, const ReadOptions& read_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
std::shared_ptr<const TableProperties>* properties,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
bool no_io = false);
Status GetTableProperties(const FileOptions& toptions,
const ReadOptions& read_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
std::shared_ptr<const TableProperties>* properties,
const MutableCFOptions& mutable_cf_options,
bool no_io = false);
Status ApproximateKeyAnchors(const ReadOptions& ro,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
uint8_t block_protection_bytes_per_key,
const MutableCFOptions& mutable_cf_options,
std::vector<TableReader::Anchor>& anchors);
// Return total memory usage of the table reader of the file.
@ -214,25 +209,23 @@ class TableCache {
size_t GetMemoryUsageByTableReader(
const FileOptions& toptions, const ReadOptions& read_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr);
const FileMetaData& file_meta,
const MutableCFOptions& mutable_cf_options);
// Returns approximated offset of a key in a file represented by fd.
uint64_t ApproximateOffsetOf(
const ReadOptions& read_options, const Slice& key,
const FileMetaData& file_meta, TableReaderCaller caller,
const InternalKeyComparator& internal_comparator,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr);
uint64_t ApproximateOffsetOf(const ReadOptions& read_options,
const Slice& key, const FileMetaData& file_meta,
TableReaderCaller caller,
const InternalKeyComparator& internal_comparator,
const MutableCFOptions& mutable_cf_options);
// Returns approximated data size between start and end keys in a file
// represented by fd (the start key must not be greater than the end key).
uint64_t ApproximateSize(
const ReadOptions& read_options, const Slice& start, const Slice& end,
const FileMetaData& file_meta, TableReaderCaller caller,
const InternalKeyComparator& internal_comparator,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr);
uint64_t ApproximateSize(const ReadOptions& read_options, const Slice& start,
const Slice& end, const FileMetaData& file_meta,
TableReaderCaller caller,
const InternalKeyComparator& internal_comparator,
const MutableCFOptions& mutable_cf_options);
CacheInterface& get_cache() { return cache_; }
@ -250,17 +243,16 @@ class TableCache {
private:
// Build a table reader
Status GetTableReader(
const ReadOptions& ro, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, bool sequential_mode,
uint8_t block_protection_bytes_per_key, HistogramImpl* file_read_hist,
std::unique_ptr<TableReader>* table_reader,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
bool skip_filters = false, int level = -1,
bool prefetch_index_and_filter_in_cache = true,
size_t max_file_size_for_l0_meta_pin = 0,
Temperature file_temperature = Temperature::kUnknown);
Status GetTableReader(const ReadOptions& ro, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, bool sequential_mode,
HistogramImpl* file_read_hist,
std::unique_ptr<TableReader>* table_reader,
const MutableCFOptions& mutable_cf_options,
bool skip_filters = false, int level = -1,
bool prefetch_index_and_filter_in_cache = true,
size_t max_file_size_for_l0_meta_pin = 0,
Temperature file_temperature = Temperature::kUnknown);
// Update the max_covering_tombstone_seq in the GetContext for each key based
// on the range deletions in the table

View File

@ -17,10 +17,8 @@ namespace ROCKSDB_NAMESPACE {
DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
(const ReadOptions& options, const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
uint8_t block_protection_bytes_per_key,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters, bool skip_range_deletions,
int level, TypedHandle* handle) {
const MutableCFOptions& mutable_cf_options, HistogramImpl* file_read_hist,
bool skip_filters, bool skip_range_deletions, int level, TypedHandle* handle) {
auto& fd = file_meta.fd;
Status s;
TableReader* t = fd.table_reader;
@ -72,7 +70,7 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
if (t == nullptr) {
assert(handle == nullptr);
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, block_protection_bytes_per_key, prefix_extractor,
&handle, mutable_cf_options,
options.read_tier == kBlockCacheTier /* no_io */,
file_read_hist, skip_filters, level,
true /* prefetch_index_and_filter_in_cache */,
@ -88,7 +86,8 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
}
if (s.ok()) {
CO_AWAIT(t->MultiGet)
(options, &table_range, prefix_extractor.get(), skip_filters);
(options, &table_range, mutable_cf_options.prefix_extractor.get(),
skip_filters);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
Status* status = iter->s;

View File

@ -1636,12 +1636,12 @@ class VersionBuilder::Rep {
return s;
}
Status LoadTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key) {
Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const MutableCFOptions& mutable_cf_options,
size_t max_file_size_for_l0_meta_pin,
const ReadOptions& read_options) {
assert(table_cache_ != nullptr);
assert(!track_found_and_missing_files_ || valid_version_available_);
@ -1716,7 +1716,7 @@ class VersionBuilder::Rep {
statuses[file_idx] = table_cache_->FindTable(
read_options, file_options_,
*(base_vstorage_->InternalComparator()), *file_meta, &handle,
block_protection_bytes_per_key, prefix_extractor, false /*no_io */,
mutable_cf_options, false /*no_io */,
internal_stats->GetFileReadHist(level), false, level,
prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin,
file_meta->temperature);
@ -1777,13 +1777,12 @@ Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) const {
Status VersionBuilder::LoadTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key) {
return rep_->LoadTableHandlers(
internal_stats, max_threads, prefetch_index_and_filter_in_cache,
is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin,
read_options, block_protection_bytes_per_key);
const MutableCFOptions& mutable_cf_options,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options) {
return rep_->LoadTableHandlers(internal_stats, max_threads,
prefetch_index_and_filter_in_cache,
is_initial_load, mutable_cf_options,
max_file_size_for_l0_meta_pin, read_options);
}
void VersionBuilder::CreateOrReplaceSavePoint() {
@ -1814,16 +1813,15 @@ Status VersionBuilder::SaveSavePointTo(VersionStorageInfo* vstorage) const {
Status VersionBuilder::LoadSavePointTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key) {
const MutableCFOptions& mutable_cf_options,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options) {
if (!savepoint_ || !savepoint_->ValidVersionAvailable()) {
return Status::InvalidArgument();
}
return savepoint_->LoadTableHandlers(
internal_stats, max_threads, prefetch_index_and_filter_in_cache,
is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin,
read_options, block_protection_bytes_per_key);
is_initial_load, mutable_cf_options, max_file_size_for_l0_meta_pin,
read_options);
}
void VersionBuilder::ClearSavePoint() { savepoint_.reset(nullptr); }

View File

@ -54,12 +54,12 @@ class VersionBuilder {
Status SaveTo(VersionStorageInfo* vstorage) const;
// Load all the table handlers for the current Version in the builder.
Status LoadTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key);
Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const MutableCFOptions& mutable_cf_options,
size_t max_file_size_for_l0_meta_pin,
const ReadOptions& read_options);
//============APIs only used by VersionEditHandlerPointInTime ============//
@ -99,12 +99,13 @@ class VersionBuilder {
// Load all the table handlers for the Version in the save point.
// Non-OK status will be returned if there is not a valid save point.
Status LoadSavePointTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
size_t max_file_size_for_l0_meta_pin, const ReadOptions& read_options,
uint8_t block_protection_bytes_per_key);
Status LoadSavePointTableHandlers(InternalStats* internal_stats,
int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const MutableCFOptions& mutable_cf_options,
size_t max_file_size_for_l0_meta_pin,
const ReadOptions& read_options);
void ClearSavePoint();

View File

@ -545,9 +545,8 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
Status s = builder->LoadTableHandlers(
cfd->internal_stats(),
version_set_->db_options_->max_file_opening_threads,
prefetch_index_and_filter_in_cache, is_initial_load,
moptions->prefix_extractor, MaxFileSizeForL0MetaPin(*moptions),
read_options_, moptions->block_protection_bytes_per_key);
prefetch_index_and_filter_in_cache, is_initial_load, *moptions,
MaxFileSizeForL0MetaPin(*moptions), read_options_);
if ((s.IsPathNotFound() || s.IsCorruption()) && no_error_if_files_missing_) {
s = Status::OK();
}
@ -870,8 +869,7 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersionBeforeApplyEdit(
s = builder->LoadSavePointTableHandlers(
cfd->internal_stats(),
version_set_->db_options_->max_file_opening_threads, false, true,
cf_opts_ptr->prefix_extractor, MaxFileSizeForL0MetaPin(*cf_opts_ptr),
read_options_, cf_opts_ptr->block_protection_bytes_per_key);
*cf_opts_ptr, MaxFileSizeForL0MetaPin(*cf_opts_ptr), read_options_);
if (!s.ok()) {
delete version;
if (s.IsCorruption()) {

View File

@ -964,15 +964,15 @@ namespace {
class LevelIterator final : public InternalIterator {
public:
// @param read_options Must outlive this iterator.
// NOTE: many of the const& parameters are saved in this object (so
// must outlive this object)
LevelIterator(
TableCache* table_cache, const ReadOptions& read_options,
const FileOptions& file_options, const InternalKeyComparator& icomparator,
const LevelFilesBrief* flevel,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
const LevelFilesBrief* flevel, const MutableCFOptions& mutable_cf_options,
bool should_sample, HistogramImpl* file_read_hist,
TableReaderCaller caller, bool skip_filters, int level,
uint8_t block_protection_bytes_per_key, RangeDelAggregator* range_del_agg,
RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
nullptr,
bool allow_unprepared_value = false,
@ -984,7 +984,8 @@ class LevelIterator final : public InternalIterator {
icomparator_(icomparator),
user_comparator_(icomparator.user_comparator()),
flevel_(flevel),
prefix_extractor_(prefix_extractor),
mutable_cf_options_(mutable_cf_options),
prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
file_read_hist_(file_read_hist),
caller_(caller),
file_index_(flevel_->num_files),
@ -996,7 +997,6 @@ class LevelIterator final : public InternalIterator {
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber),
level_(level),
block_protection_bytes_per_key_(block_protection_bytes_per_key),
should_sample_(should_sample),
skip_filters_(skip_filters),
allow_unprepared_value_(allow_unprepared_value),
@ -1147,12 +1147,12 @@ class LevelIterator final : public InternalIterator {
ClearRangeTombstoneIter();
return table_cache_->NewIterator(
read_options_, file_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_, prefix_extractor_,
range_del_agg_, mutable_cf_options_,
nullptr /* don't need reference to table */, file_read_hist_, caller_,
/*arena=*/nullptr, skip_filters_, level_,
/*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
largest_compaction_key, allow_unprepared_value_,
block_protection_bytes_per_key_, &read_seq_, range_tombstone_iter_);
largest_compaction_key, allow_unprepared_value_, &read_seq_,
range_tombstone_iter_);
}
// Check if current file being fully within iterate_lower_bound.
@ -1176,10 +1176,8 @@ class LevelIterator final : public InternalIterator {
const UserComparatorWrapper user_comparator_;
const LevelFilesBrief* flevel_;
mutable FileDescriptor current_value_;
// `prefix_extractor_` may be non-null even for total order seek. Checking
// this variable is not the right way to identify whether prefix iterator
// is used.
const std::shared_ptr<const SliceTransform>& prefix_extractor_;
const MutableCFOptions& mutable_cf_options_;
const SliceTransform* prefix_extractor_;
HistogramImpl* file_read_hist_;
TableReaderCaller caller_;
@ -1213,7 +1211,6 @@ class LevelIterator final : public InternalIterator {
SequenceNumber read_seq_;
int level_;
uint8_t block_protection_bytes_per_key_;
bool should_sample_;
bool skip_filters_;
bool allow_unprepared_value_;
@ -1580,8 +1577,7 @@ Status Version::GetTableProperties(const ReadOptions& read_options,
auto ioptions = cfd_->ioptions();
Status s = table_cache->GetTableProperties(
file_options_, read_options, cfd_->internal_comparator(), *file_meta, tp,
mutable_cf_options_.block_protection_bytes_per_key,
mutable_cf_options_.prefix_extractor, true /* no io */);
mutable_cf_options_, true /* no io */);
if (s.ok()) {
return s;
}
@ -1667,8 +1663,7 @@ Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
Status s = table_cache->GetRangeTombstoneIterator(
read_options, cfd_->internal_comparator(), *file_meta,
cfd_->GetLatestMutableCFOptions()->block_protection_bytes_per_key,
&tombstone_iter);
mutable_cf_options_, &tombstone_iter);
if (!s.ok()) {
return s;
}
@ -1785,9 +1780,7 @@ size_t Version::GetMemoryUsageByTableReaders(const ReadOptions& read_options) {
for (size_t i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
file_options_, read_options, cfd_->internal_comparator(),
*file_level.files[i].file_metadata,
mutable_cf_options_.block_protection_bytes_per_key,
mutable_cf_options_.prefix_extractor);
*file_level.files[i].file_metadata, mutable_cf_options_);
}
}
return total_usage;
@ -1936,10 +1929,9 @@ InternalIterator* Version::TEST_GetLevelIterator(
auto level_iter = new (mem) LevelIterator(
cfd_->table_cache(), read_options, file_options_,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_.prefix_extractor, should_sample_file_read(),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
mutable_cf_options_.block_protection_bytes_per_key,
nullptr /* range_del_agg */, nullptr /* compaction_boundaries */,
allow_unprepared_value, &tombstone_iter_ptr);
if (read_options.ignore_range_deletions) {
@ -2044,14 +2036,12 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
auto table_iter = cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(),
*file.file_metadata, /*range_del_agg=*/nullptr,
mutable_cf_options_.prefix_extractor, nullptr,
cfd_->internal_stats()->GetFileReadHist(0),
*file.file_metadata, /*range_del_agg=*/nullptr, mutable_cf_options_,
nullptr, cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value,
mutable_cf_options_.block_protection_bytes_per_key,
/*range_del_read_seqno=*/nullptr, &tombstone_iter);
if (read_options.ignore_range_deletions) {
merge_iter_builder->AddIterator(table_iter);
@ -2078,10 +2068,9 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
auto level_iter = new (mem) LevelIterator(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_.prefix_extractor, should_sample_file_read(),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
mutable_cf_options_.block_protection_bytes_per_key,
/*range_del_agg=*/nullptr,
/*compaction_boundaries=*/nullptr, allow_unprepared_value,
&tombstone_iter_ptr);
@ -2120,15 +2109,13 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
}
ScopedArenaPtr<InternalIterator> iter(cfd_->table_cache()->NewIterator(
read_options, file_options, cfd_->internal_comparator(),
*file->file_metadata, &range_del_agg,
mutable_cf_options_.prefix_extractor, nullptr,
*file->file_metadata, &range_del_agg, mutable_cf_options_, nullptr,
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, &arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false,
mutable_cf_options_.block_protection_bytes_per_key));
/*allow_unprepared_value=*/false));
status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key,
iter.get(), overlap);
if (!status.ok() || *overlap) {
@ -2140,11 +2127,10 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
ScopedArenaPtr<InternalIterator> iter(new (mem) LevelIterator(
cfd_->table_cache(), read_options, file_options,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_.prefix_extractor, should_sample_file_read(),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
mutable_cf_options_.block_protection_bytes_per_key, &range_del_agg,
nullptr, false));
&range_del_agg, nullptr, false));
status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key,
iter.get(), overlap);
}
@ -2457,8 +2443,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.block_protection_bytes_per_key,
mutable_cf_options_.prefix_extractor,
&get_context, mutable_cf_options_,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
@ -2693,10 +2678,9 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
if (!skip_filters) {
Status status = table_cache_->MultiGetFilter(
read_options, *internal_comparator(), *f->file_metadata,
mutable_cf_options_.prefix_extractor,
mutable_cf_options_,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
fp.GetHitFileLevel(), &file_range, &table_handle,
mutable_cf_options_.block_protection_bytes_per_key);
fp.GetHitFileLevel(), &file_range, &table_handle);
skip_range_deletions = true;
if (status.ok()) {
skip_filters = true;
@ -2880,10 +2864,9 @@ Status Version::ProcessBatch(
if (!skip_filters) {
Status status = table_cache_->MultiGetFilter(
read_options, *internal_comparator(), *f->file_metadata,
mutable_cf_options_.prefix_extractor,
mutable_cf_options_,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
fp.GetHitFileLevel(), &file_range, &table_handle,
mutable_cf_options_.block_protection_bytes_per_key);
fp.GetHitFileLevel(), &file_range, &table_handle);
if (status.ok()) {
skip_filters = true;
skip_range_deletions = true;
@ -5545,10 +5528,8 @@ Status VersionSet::ProcessManifestWrites(
s = builder_guards[i]->version_builder()->LoadTableHandlers(
cfd->internal_stats(), 1 /* max_threads */,
true /* prefetch_index_and_filter_in_cache */,
false /* is_initial_load */,
mutable_cf_options_ptrs[i]->prefix_extractor,
MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]), read_options,
mutable_cf_options_ptrs[i]->block_protection_bytes_per_key);
false /* is_initial_load */, *mutable_cf_options_ptrs[i],
MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]), read_options);
if (!s.ok()) {
if (db_options_->paranoid_checks) {
break;
@ -6935,8 +6916,7 @@ uint64_t VersionSet::ApproximateOffsetOf(const ReadOptions& read_options,
const MutableCFOptions& cf_opts = v->GetMutableCFOptions();
if (table_cache != nullptr) {
result = table_cache->ApproximateOffsetOf(
read_options, key, *f.file_metadata, caller, icmp,
cf_opts.block_protection_bytes_per_key, cf_opts.prefix_extractor);
read_options, key, *f.file_metadata, caller, icmp, cf_opts);
}
}
return result;
@ -6977,9 +6957,8 @@ uint64_t VersionSet::ApproximateSize(const ReadOptions& read_options,
return 0;
}
const MutableCFOptions& cf_opts = v->GetMutableCFOptions();
return table_cache->ApproximateSize(
read_options, start, end, *f.file_metadata, caller, icmp,
cf_opts.block_protection_bytes_per_key, cf_opts.prefix_extractor);
return table_cache->ApproximateSize(read_options, start, end,
*f.file_metadata, caller, icmp, cf_opts);
}
void VersionSet::RemoveLiveFiles(
@ -7129,7 +7108,7 @@ InternalIterator* VersionSet::MakeInputIterator(
list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions,
cfd->internal_comparator(), fmd, range_del_agg,
c->mutable_cf_options()->prefix_extractor,
*c->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
/*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
/*arena=*/nullptr,
@ -7139,7 +7118,6 @@ InternalIterator* VersionSet::MakeInputIterator(
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false,
c->mutable_cf_options()->block_protection_bytes_per_key,
/*range_del_read_seqno=*/nullptr,
/*range_del_iter=*/&range_tombstone_iter);
range_tombstones.emplace_back(std::move(range_tombstone_iter),
@ -7152,13 +7130,12 @@ InternalIterator* VersionSet::MakeInputIterator(
list[num++] = new LevelIterator(
cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), c->input_levels(which),
c->mutable_cf_options()->prefix_extractor,
*c->mutable_cf_options(),
/*should_sample=*/false,
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)),
c->mutable_cf_options()->block_protection_bytes_per_key,
range_del_agg, c->boundaries(which), false, &tombstone_iter_ptr);
/*level=*/static_cast<int>(c->level(which)), range_del_agg,
c->boundaries(which), false, &tombstone_iter_ptr);
range_tombstones.emplace_back(nullptr, tombstone_iter_ptr);
}
}
@ -7403,7 +7380,6 @@ Status VersionSet::VerifyFileMetadata(const ReadOptions& read_options,
const MutableCFOptions* const cf_opts = cfd->GetLatestMutableCFOptions();
assert(cf_opts);
std::shared_ptr<const SliceTransform> pe = cf_opts->prefix_extractor;
size_t max_sz_for_l0_meta_pin = MaxFileSizeForL0MetaPin(*cf_opts);
const FileOptions& file_opts = file_options();
@ -7419,8 +7395,7 @@ Status VersionSet::VerifyFileMetadata(const ReadOptions& read_options,
TableCache::TypedHandle* handle = nullptr;
FileMetaData meta_copy = meta;
status = table_cache->FindTable(
read_options, file_opts, *icmp, meta_copy, &handle,
cf_opts->block_protection_bytes_per_key, pe,
read_options, file_opts, *icmp, meta_copy, &handle, *cf_opts,
/*no_io=*/false, internal_stats->GetFileReadHist(level), false, level,
/*prefetch_index_and_filter_in_cache*/ false, max_sz_for_l0_meta_pin,
meta_copy.temperature);

View File

@ -25,8 +25,7 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
s = CO_AWAIT(table_cache_->MultiGet)(
read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.block_protection_bytes_per_key,
mutable_cf_options_.prefix_extractor,
mutable_cf_options_,
cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
skip_range_deletions, hit_file_level, table_handle);
// TODO: examine the behavior for corrupted key

View File

@ -1182,6 +1182,9 @@ class VersionSetTestBase {
immutable_options_.fs = fs_;
immutable_options_.clock = env_->GetSystemClock().get();
cf_options_.table_factory = table_factory_;
mutable_cf_options_.table_factory = table_factory_;
versions_.reset(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,

View File

@ -2624,9 +2624,8 @@ class MemTableInserter : public WriteBatch::Handler {
// TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
ret_status.PermitUncheckedError();
return Status::NotSupported(
std::string("DeleteRange not supported for table type ") +
cfd->ioptions()->table_factory->Name() + " in CF " +
cfd->GetName());
std::string("CF " + cfd->GetName() +
" reports it does not support DeleteRange"));
}
int cmp =
cfd->user_comparator()->CompareWithoutTimestamp(begin_key, end_key);

View File

@ -79,6 +79,8 @@ class Configurable {
}
template <typename T>
T* GetOptions(const std::string& name) {
// FIXME: Is this sometimes reading a raw pointer from a shared_ptr,
// unsafely relying on the object layout?
return reinterpret_cast<T*>(const_cast<void*>(GetOptionsPtr(name)));
}

View File

@ -266,6 +266,75 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct MutableCFOptions, disable_auto_compactions),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"table_factory", OptionTypeInfo::AsCustomSharedPtr<TableFactory>(
offsetof(struct MutableCFOptions, table_factory),
OptionVerificationType::kByName,
(OptionTypeFlags::kCompareLoose |
OptionTypeFlags::kStringNameOnly |
OptionTypeFlags::kDontPrepare))},
{"block_based_table_factory",
{offsetof(struct MutableCFOptions, table_factory),
OptionType::kCustomizable, OptionVerificationType::kAlias,
OptionTypeFlags::kShared | OptionTypeFlags::kCompareLoose,
// Parses the input value and creates a BlockBasedTableFactory
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
BlockBasedTableOptions* old_opts = nullptr;
auto table_factory =
static_cast<std::shared_ptr<TableFactory>*>(addr);
if (table_factory->get() != nullptr) {
old_opts =
table_factory->get()->GetOptions<BlockBasedTableOptions>();
}
if (name == "block_based_table_factory") {
std::unique_ptr<TableFactory> new_factory;
if (old_opts != nullptr) {
new_factory.reset(NewBlockBasedTableFactory(*old_opts));
} else {
new_factory.reset(NewBlockBasedTableFactory());
}
Status s = new_factory->ConfigureFromString(opts, value);
if (s.ok()) {
table_factory->reset(new_factory.release());
}
return s;
} else if (old_opts != nullptr) {
return table_factory->get()->ConfigureOption(opts, name, value);
} else {
return Status::NotFound("Mismatched table option: ", name);
}
}}},
{"plain_table_factory",
{offsetof(struct MutableCFOptions, table_factory),
OptionType::kCustomizable, OptionVerificationType::kAlias,
OptionTypeFlags::kShared | OptionTypeFlags::kCompareLoose,
// Parses the input value and creates a PlainTableFactory
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
PlainTableOptions* old_opts = nullptr;
auto table_factory =
static_cast<std::shared_ptr<TableFactory>*>(addr);
if (table_factory->get() != nullptr) {
old_opts = table_factory->get()->GetOptions<PlainTableOptions>();
}
if (name == "plain_table_factory") {
std::unique_ptr<TableFactory> new_factory;
if (old_opts != nullptr) {
new_factory.reset(NewPlainTableFactory(*old_opts));
} else {
new_factory.reset(NewPlainTableFactory());
}
Status s = new_factory->ConfigureFromString(opts, value);
if (s.ok()) {
table_factory->reset(new_factory.release());
}
return s;
} else if (old_opts != nullptr) {
return table_factory->get()->ConfigureOption(opts, name, value);
} else {
return Status::NotFound("Mismatched table option: ", name);
}
}}},
{"filter_deletes",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
@ -713,76 +782,6 @@ static std::unordered_map<std::string, OptionTypeInfo>
MemTableRepFactory::CreateFromString(opts, value, shared);
return s;
}}},
{"table_factory",
OptionTypeInfo::AsCustomSharedPtr<TableFactory>(
offsetof(struct ImmutableCFOptions, table_factory),
OptionVerificationType::kByName,
(OptionTypeFlags::kCompareLoose |
OptionTypeFlags::kStringNameOnly |
OptionTypeFlags::kDontPrepare))},
{"block_based_table_factory",
{offsetof(struct ImmutableCFOptions, table_factory),
OptionType::kCustomizable, OptionVerificationType::kAlias,
OptionTypeFlags::kShared | OptionTypeFlags::kCompareLoose,
// Parses the input value and creates a BlockBasedTableFactory
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
BlockBasedTableOptions* old_opts = nullptr;
auto table_factory =
static_cast<std::shared_ptr<TableFactory>*>(addr);
if (table_factory->get() != nullptr) {
old_opts =
table_factory->get()->GetOptions<BlockBasedTableOptions>();
}
if (name == "block_based_table_factory") {
std::unique_ptr<TableFactory> new_factory;
if (old_opts != nullptr) {
new_factory.reset(NewBlockBasedTableFactory(*old_opts));
} else {
new_factory.reset(NewBlockBasedTableFactory());
}
Status s = new_factory->ConfigureFromString(opts, value);
if (s.ok()) {
table_factory->reset(new_factory.release());
}
return s;
} else if (old_opts != nullptr) {
return table_factory->get()->ConfigureOption(opts, name, value);
} else {
return Status::NotFound("Mismatched table option: ", name);
}
}}},
{"plain_table_factory",
{offsetof(struct ImmutableCFOptions, table_factory),
OptionType::kCustomizable, OptionVerificationType::kAlias,
OptionTypeFlags::kShared | OptionTypeFlags::kCompareLoose,
// Parses the input value and creates a PlainTableFactory
[](const ConfigOptions& opts, const std::string& name,
const std::string& value, void* addr) {
PlainTableOptions* old_opts = nullptr;
auto table_factory =
static_cast<std::shared_ptr<TableFactory>*>(addr);
if (table_factory->get() != nullptr) {
old_opts = table_factory->get()->GetOptions<PlainTableOptions>();
}
if (name == "plain_table_factory") {
std::unique_ptr<TableFactory> new_factory;
if (old_opts != nullptr) {
new_factory.reset(NewPlainTableFactory(*old_opts));
} else {
new_factory.reset(NewPlainTableFactory());
}
Status s = new_factory->ConfigureFromString(opts, value);
if (s.ok()) {
table_factory->reset(new_factory.release());
}
return s;
} else if (old_opts != nullptr) {
return table_factory->get()->ConfigureOption(opts, name, value);
} else {
return Status::NotFound("Mismatched table option: ", name);
}
}}},
{"table_properties_collectors",
OptionTypeInfo::Vector<
std::shared_ptr<TablePropertiesCollectorFactory>>(
@ -954,7 +953,6 @@ ImmutableCFOptions::ImmutableCFOptions(const ColumnFamilyOptions& cf_options)
inplace_update_support(cf_options.inplace_update_support),
inplace_callback(cf_options.inplace_callback),
memtable_factory(cf_options.memtable_factory),
table_factory(cf_options.table_factory),
table_properties_collector_factories(
cf_options.table_properties_collector_factories),
bloom_locality(cf_options.bloom_locality),

View File

@ -53,8 +53,6 @@ struct ImmutableCFOptions {
std::shared_ptr<MemTableRepFactory> memtable_factory;
std::shared_ptr<TableFactory> table_factory;
Options::TablePropertiesCollectorFactories
table_properties_collector_factories;
@ -124,6 +122,7 @@ struct MutableCFOptions {
experimental_mempurge_threshold(
options.experimental_mempurge_threshold),
disable_auto_compactions(options.disable_auto_compactions),
table_factory(options.table_factory),
soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit),
hard_pending_compaction_bytes_limit(
@ -258,6 +257,9 @@ struct MutableCFOptions {
size_t max_successive_merges;
bool strict_max_successive_merges;
size_t inplace_update_num_locks;
// NOTE: if too many shared_ptr make their way into MutableCFOptions, the
// copy performance might suffer enough to warrant aggregating them in an
// immutable+copy-on-write sub-object managed through a single shared_ptr.
std::shared_ptr<const SliceTransform> prefix_extractor;
// [experimental]
// Used to activate or deactive the Mempurge feature (memtable garbage
@ -278,6 +280,7 @@ struct MutableCFOptions {
// Compaction related options
bool disable_auto_compactions;
std::shared_ptr<TableFactory> table_factory;
uint64_t soft_pending_compaction_bytes_limit;
uint64_t hard_pending_compaction_bytes_limit;
int level0_file_num_compaction_trigger;

View File

@ -239,6 +239,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
// Compaction related options
cf_opts->disable_auto_compactions = moptions.disable_auto_compactions;
cf_opts->table_factory = moptions.table_factory;
cf_opts->soft_pending_compaction_bytes_limit =
moptions.soft_pending_compaction_bytes_limit;
cf_opts->hard_pending_compaction_bytes_limit =
@ -315,7 +316,6 @@ void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions,
cf_opts->inplace_update_support = ioptions.inplace_update_support;
cf_opts->inplace_callback = ioptions.inplace_callback;
cf_opts->memtable_factory = ioptions.memtable_factory;
cf_opts->table_factory = ioptions.table_factory;
cf_opts->table_properties_collector_factories =
ioptions.table_properties_collector_factories;
cf_opts->bloom_locality = ioptions.bloom_locality;

View File

@ -555,7 +555,7 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
std::string column_family_name;
const ReadOptions read_options;
const WriteOptions write_options;
builder.reset(ioptions.table_factory->NewTableBuilder(
builder.reset(moptions.table_factory->NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, read_options, write_options, internal_comparator,
&internal_tbl_prop_coll_factories, options.compression,
@ -581,7 +581,7 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
file_reader.reset(new RandomAccessFileReader(std::move(file), "test"));
const bool kSkipFilters = true;
const bool kImmortal = true;
ASSERT_OK(ioptions.table_factory->NewTableReader(
ASSERT_OK(moptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor, soptions,
internal_comparator,
0 /* block_protection_bytes_per_key */, !kSkipFilters,

View File

@ -416,7 +416,7 @@ Status SstFileWriter::Open(const std::string& file_path, Temperature temp) {
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
r->builder.reset(r->mutable_cf_options.table_factory->NewTableBuilder(
table_builder_options, r->file_writer.get()));
r->file_info = ExternalSstFileInfo();

View File

@ -379,7 +379,7 @@ class TableConstructor : public Constructor {
std::string column_family_name;
const ReadOptions read_options;
const WriteOptions write_options;
builder.reset(ioptions.table_factory->NewTableBuilder(
builder.reset(moptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, read_options, write_options,
internal_comparator,
&internal_tbl_prop_coll_factories,
@ -440,7 +440,7 @@ class TableConstructor : public Constructor {
TEST_GetSink()->contents(), file_num_, ioptions.allow_mmap_reads));
file_reader_.reset(new RandomAccessFileReader(std::move(source), "test"));
return ioptions.table_factory->NewTableReader(
return moptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor, soptions,
*last_internal_comparator_,
0 /* block_protection_bytes_per_key */,
@ -4460,7 +4460,7 @@ TEST_P(BlockBasedTableTest, NoFileChecksum) {
std::unique_ptr<TableBuilder> builder;
const ReadOptions read_options;
const WriteOptions write_options;
builder.reset(ioptions.table_factory->NewTableBuilder(
builder.reset(moptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, read_options, write_options,
*comparator, &internal_tbl_prop_coll_factories,
options.compression, options.compression_opts,
@ -4498,7 +4498,7 @@ TEST_P(BlockBasedTableTest, Crc32cFileChecksum) {
std::unique_ptr<TableBuilder> builder;
const ReadOptions read_options;
const WriteOptions write_options;
builder.reset(ioptions.table_factory->NewTableBuilder(
builder.reset(moptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, read_options, write_options,
*comparator, &internal_tbl_prop_coll_factories,
options.compression, options.compression_opts,
@ -5491,7 +5491,7 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
ImmutableOptions ioptions2(options2);
const MutableCFOptions moptions2(options2);
ASSERT_OK(ioptions.table_factory->NewTableReader(
ASSERT_OK(moptions.table_factory->NewTableReader(
TableReaderOptions(ioptions2, moptions2.prefix_extractor, EnvOptions(),
GetPlainInternalComparator(options2.comparator),
0 /* block_protection_bytes_per_key */),

View File

@ -34,15 +34,6 @@ class MemoryTest : public testing::Test {
}
}
void GetCachePointersFromTableFactory(
const TableFactory* factory,
std::unordered_set<const Cache*>* cache_set) {
const auto bbto = factory->GetOptions<BlockBasedTableOptions>();
if (bbto != nullptr) {
cache_set->insert(bbto->block_cache.get());
}
}
void GetCachePointers(const std::vector<DB*>& dbs,
std::unordered_set<const Cache*>* cache_set) {
cache_set->clear();
@ -61,13 +52,8 @@ class MemoryTest : public testing::Test {
cache_set->insert(db->GetDBOptions().row_cache.get());
// Cache from table factories
std::unordered_map<std::string, const ImmutableCFOptions*> iopts_map;
if (db_impl != nullptr) {
ASSERT_OK(db_impl->TEST_GetAllImmutableCFOptions(&iopts_map));
}
for (const auto& pair : iopts_map) {
GetCachePointersFromTableFactory(pair.second->table_factory.get(),
cache_set);
db_impl->TEST_GetAllBlockCaches(cache_set);
}
}
}
@ -266,4 +252,3 @@ int main(int argc, char** argv) {
return 0;
#endif
}