mirror of https://github.com/facebook/rocksdb.git
Eliminate unnecessary (slow) block cache Ref()ing in MultiGet (#9899)
Summary: When MultiGet() determines that multiple query keys can be served by examining the same data block in block cache (one Lookup()), each PinnableSlice referring to data in that data block needs to hold on to the block in cache so that they can be released at arbitrary times by the API user. Historically this is accomplished with extra calls to Ref() on the Handle from Lookup(), with each PinnableSlice cleanup calling Release() on the Handle, but this creates extra contention on the block cache for the extra Ref()s and Release()es, especially because they hit the same cache shard repeatedly. In the case of merge operands (possibly more cases?), the problem was compounded by doing an extra Ref()+eventual Release() for each merge operand for a key reusing a block (which could be the same key!), rather than one Ref() per key. (Note: the non-shared case with `biter` was already one per key.) This change optimizes MultiGet not to rely on these extra, contentious Ref()+Release() calls by instead, in the shared block case, wrapping the cache Release() cleanup in a refcounted object referenced by the PinnableSlices, such that after the last wrapped reference is released, the cache entry is Release()ed. Relaxed atomic refcounts should be much faster than mutex-guarded Ref() and Release(), and much less prone to a performance cliff when MultiGet() does a lot of block sharing. Note that I did not use std::shared_ptr, because that would require an extra indirection object (shared_ptr itself new/delete) in order to associate a ref increment/decrement with a Cleanable cleanup entry. (If I assumed it was the size of two pointers, I could do some hackery to make it work without the extra indirection, but that's too fragile.) Some details: * Fixed (removed) extra block cache tracing entries in cases of cache entry reuse in MultiGet, but it's likely that in some other cases traces are missing (XXX comment inserted) * Moved existing implementations for cleanable.h from iterator.cc to new cleanable.cc * Improved API comments on Cleanable * Added a public SharedCleanablePtr class to cleanable.h in case others could benefit from the same pattern (potentially many Cleanables and/or smart pointers referencing a shared Cleanable) * Add a typedef for MultiGetContext::Mask * Some variable renaming for clarity Pull Request resolved: https://github.com/facebook/rocksdb/pull/9899 Test Plan: Added unit tests for SharedCleanablePtr. Greatly enhanced ability of existing tests to detect cache use-after-free. * Release PinnableSlices from MultiGet as they are read rather than in bulk (in db_test_util wrapper). * In ASAN build, default to using a trivially small LRUCache for block_cache so that entries are immediately erased when unreferenced. (Updated two tests that depend on caching.) New ASAN testsuite running time seems OK to me. If I introduce a bug into my implementation where we skip the shared cleanups on block reuse, ASAN detects the bug in `db_basic_test *MultiGet*`. If I remove either of the above testing enhancements, the bug is not detected. Consider for follow-up work: manipulate or randomize ordering of PinnableSlice use and release from MultiGet db_test_util wrapper. But in typical cases, natural ordering gives pretty good functional coverage. Performance test: In the extreme (but possible) case of MultiGetting the same or adjacent keys in a batch, throughput can improve by an order of magnitude. `./db_bench -benchmarks=multireadrandom -db=/dev/shm/testdb -readonly -num=5 -duration=10 -threads=20 -multiread_batched -batch_size=200` Before ops/sec, num=5: 1,384,394 Before ops/sec, num=500: 6,423,720 After ops/sec, num=500: 10,658,794 After ops/sec, num=5: 16,027,257 Also note that previously, with high parallelism, having query keys concentrated in a single block was worse than spreading them out a bit. Now concentrated in a single block is faster than spread out, which is hopefully consistent with natural expectation. Random query performance: with num=1000000, over 999 x 10s runs running before & after simultaneously (each -threads=12): Before: multireadrandom [AVG 999 runs] : 1088699 (± 7344) ops/sec; 120.4 (± 0.8 ) MB/sec After: multireadrandom [AVG 999 runs] : 1090402 (± 7230) ops/sec; 120.6 (± 0.8 ) MB/sec Possibly better, possibly in the noise. Reviewed By: anand1976 Differential Revision: D35907003 Pulled By: pdillinger fbshipit-source-id: bbd244d703649a8ca12d476f2d03853ed9d1a17e
This commit is contained in:
parent
ce2d8a4239
commit
9d0cae7104
|
@ -796,6 +796,7 @@ set(SOURCES
|
||||||
trace_replay/trace_record_result.cc
|
trace_replay/trace_record_result.cc
|
||||||
trace_replay/trace_record.cc
|
trace_replay/trace_record.cc
|
||||||
trace_replay/trace_replay.cc
|
trace_replay/trace_replay.cc
|
||||||
|
util/cleanable.cc
|
||||||
util/coding.cc
|
util/coding.cc
|
||||||
util/compaction_job_stats_impl.cc
|
util/compaction_job_stats_impl.cc
|
||||||
util/comparator.cc
|
util/comparator.cc
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
|
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.
|
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.
|
||||||
|
* Fixed unnecessary block cache contention when queries within a MultiGet batch and across parallel batches access the same data block, which previously could cause severely degraded performance in this unusual case. (In more typical MultiGet cases, this fix is expected to yield a small or negligible performance improvement.)
|
||||||
|
|
||||||
## 7.2.0 (04/15/2022)
|
## 7.2.0 (04/15/2022)
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
|
2
TARGETS
2
TARGETS
|
@ -224,6 +224,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
|
||||||
"trace_replay/trace_record_result.cc",
|
"trace_replay/trace_record_result.cc",
|
||||||
"trace_replay/trace_replay.cc",
|
"trace_replay/trace_replay.cc",
|
||||||
"util/build_version.cc",
|
"util/build_version.cc",
|
||||||
|
"util/cleanable.cc",
|
||||||
"util/coding.cc",
|
"util/coding.cc",
|
||||||
"util/compaction_job_stats_impl.cc",
|
"util/compaction_job_stats_impl.cc",
|
||||||
"util/comparator.cc",
|
"util/comparator.cc",
|
||||||
|
@ -543,6 +544,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
|
||||||
"trace_replay/trace_record_result.cc",
|
"trace_replay/trace_record_result.cc",
|
||||||
"trace_replay/trace_replay.cc",
|
"trace_replay/trace_replay.cc",
|
||||||
"util/build_version.cc",
|
"util/build_version.cc",
|
||||||
|
"util/cleanable.cc",
|
||||||
"util/coding.cc",
|
"util/coding.cc",
|
||||||
"util/compaction_job_stats_impl.cc",
|
"util/compaction_job_stats_impl.cc",
|
||||||
"util/comparator.cc",
|
"util/comparator.cc",
|
||||||
|
|
|
@ -111,6 +111,7 @@ TEST_P(DBBloomFilterTestDefFormatVersion, KeyMayExist) {
|
||||||
options_override.filter_policy = Create(20, bfp_impl_);
|
options_override.filter_policy = Create(20, bfp_impl_);
|
||||||
options_override.partition_filters = partition_filters_;
|
options_override.partition_filters = partition_filters_;
|
||||||
options_override.metadata_block_size = 32;
|
options_override.metadata_block_size = 32;
|
||||||
|
options_override.full_block_cache = true;
|
||||||
Options options = CurrentOptions(options_override);
|
Options options = CurrentOptions(options_override);
|
||||||
if (partition_filters_) {
|
if (partition_filters_) {
|
||||||
auto* table_options =
|
auto* table_options =
|
||||||
|
|
|
@ -111,9 +111,12 @@ TEST_P(DBIteratorTest, PersistedTierOnIterator) {
|
||||||
TEST_P(DBIteratorTest, NonBlockingIteration) {
|
TEST_P(DBIteratorTest, NonBlockingIteration) {
|
||||||
do {
|
do {
|
||||||
ReadOptions non_blocking_opts, regular_opts;
|
ReadOptions non_blocking_opts, regular_opts;
|
||||||
Options options = CurrentOptions();
|
anon::OptionsOverride options_override;
|
||||||
|
options_override.full_block_cache = true;
|
||||||
|
Options options = CurrentOptions(options_override);
|
||||||
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
||||||
non_blocking_opts.read_tier = kBlockCacheTier;
|
non_blocking_opts.read_tier = kBlockCacheTier;
|
||||||
|
|
||||||
CreateAndReopenWithCF({"pikachu"}, options);
|
CreateAndReopenWithCF({"pikachu"}, options);
|
||||||
// write one kv to the database.
|
// write one kv to the database.
|
||||||
ASSERT_OK(Put(1, "a", "b"));
|
ASSERT_OK(Put(1, "a", "b"));
|
||||||
|
|
|
@ -11,6 +11,8 @@
|
||||||
|
|
||||||
#include "db/forward_iterator.h"
|
#include "db/forward_iterator.h"
|
||||||
#include "env/mock_env.h"
|
#include "env/mock_env.h"
|
||||||
|
#include "port/lang.h"
|
||||||
|
#include "rocksdb/cache.h"
|
||||||
#include "rocksdb/convenience.h"
|
#include "rocksdb/convenience.h"
|
||||||
#include "rocksdb/env_encryption.h"
|
#include "rocksdb/env_encryption.h"
|
||||||
#include "rocksdb/unique_id.h"
|
#include "rocksdb/unique_id.h"
|
||||||
|
@ -360,6 +362,17 @@ Options DBTestBase::GetOptions(
|
||||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
|
||||||
"NewWritableFile:O_DIRECT");
|
"NewWritableFile:O_DIRECT");
|
||||||
#endif
|
#endif
|
||||||
|
// kMustFreeHeapAllocations -> indicates ASAN build
|
||||||
|
if (kMustFreeHeapAllocations && !options_override.full_block_cache) {
|
||||||
|
// Detecting block cache use-after-free is normally difficult in unit
|
||||||
|
// tests, because as a cache, it tends to keep unreferenced entries in
|
||||||
|
// memory, and we normally want unit tests to take advantage of block
|
||||||
|
// cache for speed. However, we also want a strong chance of detecting
|
||||||
|
// block cache use-after-free in unit tests in ASAN builds, so for ASAN
|
||||||
|
// builds we use a trivially small block cache to which entries can be
|
||||||
|
// added but are immediately freed on no more references.
|
||||||
|
table_options.block_cache = NewLRUCache(/* too small */ 1);
|
||||||
|
}
|
||||||
|
|
||||||
bool can_allow_mmap = IsMemoryMappedAccessSupported();
|
bool can_allow_mmap = IsMemoryMappedAccessSupported();
|
||||||
switch (option_config) {
|
switch (option_config) {
|
||||||
|
@ -831,7 +844,7 @@ std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
|
||||||
std::vector<Status> s;
|
std::vector<Status> s;
|
||||||
if (!batched) {
|
if (!batched) {
|
||||||
s = db_->MultiGet(options, handles, keys, &result);
|
s = db_->MultiGet(options, handles, keys, &result);
|
||||||
for (unsigned int i = 0; i < s.size(); ++i) {
|
for (size_t i = 0; i < s.size(); ++i) {
|
||||||
if (s[i].IsNotFound()) {
|
if (s[i].IsNotFound()) {
|
||||||
result[i] = "NOT_FOUND";
|
result[i] = "NOT_FOUND";
|
||||||
} else if (!s[i].ok()) {
|
} else if (!s[i].ok()) {
|
||||||
|
@ -844,13 +857,16 @@ std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
|
||||||
s.resize(cfs.size());
|
s.resize(cfs.size());
|
||||||
db_->MultiGet(options, cfs.size(), handles.data(), keys.data(),
|
db_->MultiGet(options, cfs.size(), handles.data(), keys.data(),
|
||||||
pin_values.data(), s.data());
|
pin_values.data(), s.data());
|
||||||
for (unsigned int i = 0; i < s.size(); ++i) {
|
for (size_t i = 0; i < s.size(); ++i) {
|
||||||
if (s[i].IsNotFound()) {
|
if (s[i].IsNotFound()) {
|
||||||
result[i] = "NOT_FOUND";
|
result[i] = "NOT_FOUND";
|
||||||
} else if (!s[i].ok()) {
|
} else if (!s[i].ok()) {
|
||||||
result[i] = s[i].ToString();
|
result[i] = s[i].ToString();
|
||||||
} else {
|
} else {
|
||||||
result[i].assign(pin_values[i].data(), pin_values[i].size());
|
result[i].assign(pin_values[i].data(), pin_values[i].size());
|
||||||
|
// Increase likelihood of detecting potential use-after-free bugs with
|
||||||
|
// PinnableSlices tracking the same resource
|
||||||
|
pin_values[i].Reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -863,23 +879,25 @@ std::vector<std::string> DBTestBase::MultiGet(const std::vector<std::string>& k,
|
||||||
options.verify_checksums = true;
|
options.verify_checksums = true;
|
||||||
options.snapshot = snapshot;
|
options.snapshot = snapshot;
|
||||||
std::vector<Slice> keys;
|
std::vector<Slice> keys;
|
||||||
std::vector<std::string> result;
|
std::vector<std::string> result(k.size());
|
||||||
std::vector<Status> statuses(k.size());
|
std::vector<Status> statuses(k.size());
|
||||||
std::vector<PinnableSlice> pin_values(k.size());
|
std::vector<PinnableSlice> pin_values(k.size());
|
||||||
|
|
||||||
for (unsigned int i = 0; i < k.size(); ++i) {
|
for (size_t i = 0; i < k.size(); ++i) {
|
||||||
keys.push_back(k[i]);
|
keys.push_back(k[i]);
|
||||||
}
|
}
|
||||||
db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(),
|
db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||||
keys.data(), pin_values.data(), statuses.data());
|
keys.data(), pin_values.data(), statuses.data());
|
||||||
result.resize(k.size());
|
for (size_t i = 0; i < statuses.size(); ++i) {
|
||||||
for (auto iter = result.begin(); iter != result.end(); ++iter) {
|
|
||||||
iter->assign(pin_values[iter - result.begin()].data(),
|
|
||||||
pin_values[iter - result.begin()].size());
|
|
||||||
}
|
|
||||||
for (unsigned int i = 0; i < statuses.size(); ++i) {
|
|
||||||
if (statuses[i].IsNotFound()) {
|
if (statuses[i].IsNotFound()) {
|
||||||
result[i] = "NOT_FOUND";
|
result[i] = "NOT_FOUND";
|
||||||
|
} else if (!statuses[i].ok()) {
|
||||||
|
result[i] = statuses[i].ToString();
|
||||||
|
} else {
|
||||||
|
result[i].assign(pin_values[i].data(), pin_values[i].size());
|
||||||
|
// Increase likelihood of detecting potential use-after-free bugs with
|
||||||
|
// PinnableSlices tracking the same resource
|
||||||
|
pin_values[i].Reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -104,6 +104,9 @@ struct OptionsOverride {
|
||||||
std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
|
std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
|
||||||
// These will be used only if filter_policy is set
|
// These will be used only if filter_policy is set
|
||||||
bool partition_filters = false;
|
bool partition_filters = false;
|
||||||
|
// Force using a default block cache. (Setting to false allows ASAN build
|
||||||
|
// use a trivially small block cache for better UAF error detection.)
|
||||||
|
bool full_block_cache = false;
|
||||||
uint64_t metadata_block_size = 1024;
|
uint64_t metadata_block_size = 1024;
|
||||||
|
|
||||||
// Used as a bit mask of individual enums in which to skip an XF test point
|
// Used as a bit mask of individual enums in which to skip an XF test point
|
||||||
|
|
|
@ -19,11 +19,12 @@ class Cleanable {
|
||||||
Cleanable(Cleanable&) = delete;
|
Cleanable(Cleanable&) = delete;
|
||||||
Cleanable& operator=(Cleanable&) = delete;
|
Cleanable& operator=(Cleanable&) = delete;
|
||||||
|
|
||||||
|
// Executes all the registered cleanups
|
||||||
~Cleanable();
|
~Cleanable();
|
||||||
|
|
||||||
// Move constructor and move assignment is allowed.
|
// Move constructor and move assignment is allowed.
|
||||||
Cleanable(Cleanable&&);
|
Cleanable(Cleanable&&) noexcept;
|
||||||
Cleanable& operator=(Cleanable&&);
|
Cleanable& operator=(Cleanable&&) noexcept;
|
||||||
|
|
||||||
// Clients are allowed to register function/arg1/arg2 triples that
|
// Clients are allowed to register function/arg1/arg2 triples that
|
||||||
// will be invoked when this iterator is destroyed.
|
// will be invoked when this iterator is destroyed.
|
||||||
|
@ -31,8 +32,14 @@ class Cleanable {
|
||||||
// Note that unlike all of the preceding methods, this method is
|
// Note that unlike all of the preceding methods, this method is
|
||||||
// not abstract and therefore clients should not override it.
|
// not abstract and therefore clients should not override it.
|
||||||
using CleanupFunction = void (*)(void* arg1, void* arg2);
|
using CleanupFunction = void (*)(void* arg1, void* arg2);
|
||||||
|
|
||||||
|
// Add another Cleanup to the list
|
||||||
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
|
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
|
||||||
|
|
||||||
|
// Move the cleanups owned by this Cleanable to another Cleanable, adding to
|
||||||
|
// any existing cleanups it has
|
||||||
void DelegateCleanupsTo(Cleanable* other);
|
void DelegateCleanupsTo(Cleanable* other);
|
||||||
|
|
||||||
// DoCleanup and also resets the pointers for reuse
|
// DoCleanup and also resets the pointers for reuse
|
||||||
inline void Reset() {
|
inline void Reset() {
|
||||||
DoCleanup();
|
DoCleanup();
|
||||||
|
@ -40,6 +47,8 @@ class Cleanable {
|
||||||
cleanup_.next = nullptr;
|
cleanup_.next = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool HasCleanups() { return cleanup_.function != nullptr; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
struct Cleanup {
|
struct Cleanup {
|
||||||
CleanupFunction function;
|
CleanupFunction function;
|
||||||
|
@ -68,4 +77,52 @@ class Cleanable {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// A copyable, reference-counted pointer to a simple Cleanable that only
|
||||||
|
// performs registered cleanups after all copies are destroy. This is like
|
||||||
|
// shared_ptr<Cleanable> but works more efficiently with wrapping the pointer
|
||||||
|
// in an outer Cleanable (see RegisterCopyWith() and MoveAsCleanupTo()).
|
||||||
|
// WARNING: if you create a reference cycle, for example:
|
||||||
|
// SharedCleanablePtr scp;
|
||||||
|
// scp.Allocate();
|
||||||
|
// scp.RegisterCopyWith(&*scp);
|
||||||
|
// It will prevent cleanups from ever happening!
|
||||||
|
class SharedCleanablePtr {
|
||||||
|
public:
|
||||||
|
// Empy/null pointer
|
||||||
|
SharedCleanablePtr() {}
|
||||||
|
// Copy and move constructors and assignment
|
||||||
|
SharedCleanablePtr(const SharedCleanablePtr& from);
|
||||||
|
SharedCleanablePtr(SharedCleanablePtr&& from) noexcept;
|
||||||
|
SharedCleanablePtr& operator=(const SharedCleanablePtr& from);
|
||||||
|
SharedCleanablePtr& operator=(SharedCleanablePtr&& from) noexcept;
|
||||||
|
// Destructor (decrement refcount if non-null)
|
||||||
|
~SharedCleanablePtr();
|
||||||
|
// Create a new simple Cleanable and make this assign this pointer to it.
|
||||||
|
// (Reset()s first if necessary.)
|
||||||
|
void Allocate();
|
||||||
|
// Reset to empty/null (decrement refcount if previously non-null)
|
||||||
|
void Reset();
|
||||||
|
// Dereference to pointed-to Cleanable
|
||||||
|
Cleanable& operator*();
|
||||||
|
Cleanable* operator->();
|
||||||
|
// Get as raw pointer to Cleanable
|
||||||
|
Cleanable* get();
|
||||||
|
|
||||||
|
// Creates a (virtual) copy of this SharedCleanablePtr and registers its
|
||||||
|
// destruction with target, so that the cleanups registered with the
|
||||||
|
// Cleanable pointed to by this can only happen after the cleanups in the
|
||||||
|
// target Cleanable are run.
|
||||||
|
// No-op if this is empty (nullptr).
|
||||||
|
void RegisterCopyWith(Cleanable* target);
|
||||||
|
|
||||||
|
// Moves (virtually) this shared pointer to a new cleanup in the target.
|
||||||
|
// This is essentilly a move semantics version of RegisterCopyWith(), for
|
||||||
|
// performance optimization. No-op if this is empty (nullptr).
|
||||||
|
void MoveAsCleanupTo(Cleanable* target);
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct Impl;
|
||||||
|
Impl* ptr_ = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
1
src.mk
1
src.mk
|
@ -211,6 +211,7 @@ LIB_SOURCES = \
|
||||||
trace_replay/block_cache_tracer.cc \
|
trace_replay/block_cache_tracer.cc \
|
||||||
trace_replay/io_tracer.cc \
|
trace_replay/io_tracer.cc \
|
||||||
util/build_version.cc \
|
util/build_version.cc \
|
||||||
|
util/cleanable.cc \
|
||||||
util/coding.cc \
|
util/coding.cc \
|
||||||
util/compaction_job_stats_impl.cc \
|
util/compaction_job_stats_impl.cc \
|
||||||
util/comparator.cc \
|
util/comparator.cc \
|
||||||
|
|
|
@ -117,14 +117,6 @@ Status ReadBlockFromFile(
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release the cached entry and decrement its ref count.
|
|
||||||
// Do not force erase
|
|
||||||
void ReleaseCachedEntry(void* arg, void* h) {
|
|
||||||
Cache* cache = reinterpret_cast<Cache*>(arg);
|
|
||||||
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
|
|
||||||
cache->Release(handle, false /* erase_if_last_ref */);
|
|
||||||
}
|
|
||||||
|
|
||||||
// For hash based index, return false if table_properties->prefix_extractor_name
|
// For hash based index, return false if table_properties->prefix_extractor_name
|
||||||
// and prefix_extractor both exist and match, otherwise true.
|
// and prefix_extractor both exist and match, otherwise true.
|
||||||
inline bool PrefixExtractorChangedHelper(
|
inline bool PrefixExtractorChangedHelper(
|
||||||
|
@ -2570,10 +2562,11 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
iiter_unique_ptr.reset(iiter);
|
iiter_unique_ptr.reset(iiter);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t offset = std::numeric_limits<uint64_t>::max();
|
uint64_t prev_offset = std::numeric_limits<uint64_t>::max();
|
||||||
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
|
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
|
||||||
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
|
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
|
||||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
||||||
|
MultiGetContext::Mask reused_mask = 0;
|
||||||
char stack_buf[kMultiGetReadStackBufSize];
|
char stack_buf[kMultiGetReadStackBufSize];
|
||||||
std::unique_ptr<char[]> block_buf;
|
std::unique_ptr<char[]> block_buf;
|
||||||
{
|
{
|
||||||
|
@ -2635,16 +2628,19 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
|
|
||||||
statuses.emplace_back();
|
statuses.emplace_back();
|
||||||
results.emplace_back();
|
results.emplace_back();
|
||||||
if (v.handle.offset() == offset) {
|
if (v.handle.offset() == prev_offset) {
|
||||||
// We're going to reuse the block for this key later on. No need to
|
// This key can reuse the previous block (later on).
|
||||||
// look it up now. Place a null handle
|
// Mark previous as "reused"
|
||||||
|
reused_mask |= MultiGetContext::Mask{1} << (block_handles.size() - 1);
|
||||||
|
// Use null handle to indicate this one reuses same block as
|
||||||
|
// previous.
|
||||||
block_handles.emplace_back(BlockHandle::NullBlockHandle());
|
block_handles.emplace_back(BlockHandle::NullBlockHandle());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Lookup the cache for the given data block referenced by an index
|
// Lookup the cache for the given data block referenced by an index
|
||||||
// iterator value (i.e BlockHandle). If it exists in the cache,
|
// iterator value (i.e BlockHandle). If it exists in the cache,
|
||||||
// initialize block to the contents of the data block.
|
// initialize block to the contents of the data block.
|
||||||
offset = v.handle.offset();
|
prev_offset = v.handle.offset();
|
||||||
BlockHandle handle = v.handle;
|
BlockHandle handle = v.handle;
|
||||||
BlockCacheLookupContext lookup_data_block_context(
|
BlockCacheLookupContext lookup_data_block_context(
|
||||||
TableReaderCaller::kUserMultiGet);
|
TableReaderCaller::kUserMultiGet);
|
||||||
|
@ -2748,6 +2744,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
DataBlockIter first_biter;
|
DataBlockIter first_biter;
|
||||||
DataBlockIter next_biter;
|
DataBlockIter next_biter;
|
||||||
size_t idx_in_batch = 0;
|
size_t idx_in_batch = 0;
|
||||||
|
SharedCleanablePtr shared_cleanable;
|
||||||
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
|
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
|
||||||
++miter) {
|
++miter) {
|
||||||
Status s;
|
Status s;
|
||||||
|
@ -2758,7 +2755,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
bool first_block = true;
|
bool first_block = true;
|
||||||
do {
|
do {
|
||||||
DataBlockIter* biter = nullptr;
|
DataBlockIter* biter = nullptr;
|
||||||
bool reusing_block = true;
|
bool reusing_prev_block;
|
||||||
|
bool later_reused;
|
||||||
uint64_t referenced_data_size = 0;
|
uint64_t referenced_data_size = 0;
|
||||||
bool does_referenced_key_exist = false;
|
bool does_referenced_key_exist = false;
|
||||||
BlockCacheLookupContext lookup_data_block_context(
|
BlockCacheLookupContext lookup_data_block_context(
|
||||||
|
@ -2772,13 +2770,16 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
NewDataBlockIterator<DataBlockIter>(
|
NewDataBlockIterator<DataBlockIter>(
|
||||||
read_options, results[idx_in_batch], &first_biter,
|
read_options, results[idx_in_batch], &first_biter,
|
||||||
statuses[idx_in_batch]);
|
statuses[idx_in_batch]);
|
||||||
reusing_block = false;
|
reusing_prev_block = false;
|
||||||
} else {
|
} else {
|
||||||
// If handler is null and result is empty, then the status is never
|
// If handler is null and result is empty, then the status is never
|
||||||
// set, which should be the initial value: ok().
|
// set, which should be the initial value: ok().
|
||||||
assert(statuses[idx_in_batch].ok());
|
assert(statuses[idx_in_batch].ok());
|
||||||
|
reusing_prev_block = true;
|
||||||
}
|
}
|
||||||
biter = &first_biter;
|
biter = &first_biter;
|
||||||
|
later_reused =
|
||||||
|
(reused_mask & (MultiGetContext::Mask{1} << idx_in_batch)) != 0;
|
||||||
idx_in_batch++;
|
idx_in_batch++;
|
||||||
} else {
|
} else {
|
||||||
IndexValue v = iiter->value();
|
IndexValue v = iiter->value();
|
||||||
|
@ -2798,7 +2799,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
BlockType::kData, get_context, &lookup_data_block_context,
|
BlockType::kData, get_context, &lookup_data_block_context,
|
||||||
Status(), nullptr);
|
Status(), nullptr);
|
||||||
biter = &next_biter;
|
biter = &next_biter;
|
||||||
reusing_block = false;
|
reusing_prev_block = false;
|
||||||
|
later_reused = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (read_options.read_tier == kBlockCacheTier &&
|
if (read_options.read_tier == kBlockCacheTier &&
|
||||||
|
@ -2823,28 +2825,56 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reusing blocks complicates pinning/Cleanable, because the cache
|
||||||
|
// entry referenced by biter can only be released once all returned
|
||||||
|
// pinned values are released. This code previously did an extra
|
||||||
|
// block_cache Ref for each reuse, but that unnecessarily increases
|
||||||
|
// block cache contention. Instead we can use a variant of shared_ptr
|
||||||
|
// to release in block cache only once.
|
||||||
|
//
|
||||||
|
// Although the biter loop below might SaveValue multiple times for
|
||||||
|
// merges, just one value_pinner suffices, as MultiGet will merge
|
||||||
|
// the operands before returning to the API user.
|
||||||
|
Cleanable* value_pinner;
|
||||||
|
if (biter->IsValuePinned()) {
|
||||||
|
if (reusing_prev_block) {
|
||||||
|
// Note that we don't yet know if the MultiGet results will need
|
||||||
|
// to pin this block, so we might wrap a block for sharing and
|
||||||
|
// still end up with 1 (or 0) pinning ref. Not ideal but OK.
|
||||||
|
//
|
||||||
|
// Here we avoid adding redundant cleanups if we didn't end up
|
||||||
|
// delegating the cleanup from last time around.
|
||||||
|
if (!biter->HasCleanups()) {
|
||||||
|
assert(shared_cleanable.get());
|
||||||
|
if (later_reused) {
|
||||||
|
shared_cleanable.RegisterCopyWith(biter);
|
||||||
|
} else {
|
||||||
|
shared_cleanable.MoveAsCleanupTo(biter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (later_reused) {
|
||||||
|
assert(biter->HasCleanups());
|
||||||
|
// Make the existing cleanups on `biter` sharable:
|
||||||
|
shared_cleanable.Allocate();
|
||||||
|
// Move existing `biter` cleanup(s) to `shared_cleanable`
|
||||||
|
biter->DelegateCleanupsTo(&*shared_cleanable);
|
||||||
|
// Reference `shared_cleanable` as new cleanup for `biter`
|
||||||
|
shared_cleanable.RegisterCopyWith(biter);
|
||||||
|
}
|
||||||
|
assert(biter->HasCleanups());
|
||||||
|
value_pinner = biter;
|
||||||
|
} else {
|
||||||
|
value_pinner = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
// Call the *saver function on each entry/block until it returns false
|
// Call the *saver function on each entry/block until it returns false
|
||||||
for (; biter->Valid(); biter->Next()) {
|
for (; biter->Valid(); biter->Next()) {
|
||||||
ParsedInternalKey parsed_key;
|
ParsedInternalKey parsed_key;
|
||||||
Cleanable dummy;
|
|
||||||
Cleanable* value_pinner = nullptr;
|
|
||||||
Status pik_status = ParseInternalKey(
|
Status pik_status = ParseInternalKey(
|
||||||
biter->key(), &parsed_key, false /* log_err_key */); // TODO
|
biter->key(), &parsed_key, false /* log_err_key */); // TODO
|
||||||
if (!pik_status.ok()) {
|
if (!pik_status.ok()) {
|
||||||
s = pik_status;
|
s = pik_status;
|
||||||
}
|
}
|
||||||
if (biter->IsValuePinned()) {
|
|
||||||
if (reusing_block) {
|
|
||||||
Cache* block_cache = rep_->table_options.block_cache.get();
|
|
||||||
assert(biter->cache_handle() != nullptr);
|
|
||||||
block_cache->Ref(biter->cache_handle());
|
|
||||||
dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache,
|
|
||||||
biter->cache_handle());
|
|
||||||
value_pinner = &dummy;
|
|
||||||
} else {
|
|
||||||
value_pinner = biter;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
|
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
|
||||||
value_pinner)) {
|
value_pinner)) {
|
||||||
if (get_context->State() == GetContext::GetState::kFound) {
|
if (get_context->State() == GetContext::GetState::kFound) {
|
||||||
|
@ -2858,7 +2888,10 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
|
||||||
s = biter->status();
|
s = biter->status();
|
||||||
}
|
}
|
||||||
// Write the block cache access.
|
// Write the block cache access.
|
||||||
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
|
// XXX: There appear to be 'break' statements above that bypass this
|
||||||
|
// writing of the block cache trace record
|
||||||
|
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
|
||||||
|
!reusing_prev_block) {
|
||||||
// Avoid making copy of block_key, cf_name, and referenced_key when
|
// Avoid making copy of block_key, cf_name, and referenced_key when
|
||||||
// constructing the access record.
|
// constructing the access record.
|
||||||
Slice referenced_key;
|
Slice referenced_key;
|
||||||
|
|
|
@ -3,6 +3,10 @@
|
||||||
// COPYING file in the root directory) and Apache 2.0 License
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
// (found in the LICENSE.Apache file in the root directory).
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
#include "rocksdb/cleanable.h"
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
|
@ -268,6 +272,115 @@ TEST_F(CleanableTest, PinnableSlice) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void Decrement(void* intptr, void*) { --*static_cast<int*>(intptr); }
|
||||||
|
|
||||||
|
// Allow unit testing moved-from data
|
||||||
|
template <class T>
|
||||||
|
void MarkInitializedForClangAnalyze(T& t) {
|
||||||
|
// No net effect, but confuse analyzer. (Published advice doesn't work.)
|
||||||
|
char* p = reinterpret_cast<char*>(&t);
|
||||||
|
std::swap(*p, *p);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(CleanableTest, SharedWrapCleanables) {
|
||||||
|
int val = 5;
|
||||||
|
Cleanable c1, c2;
|
||||||
|
c1.RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
c1.RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
ASSERT_TRUE(c1.HasCleanups());
|
||||||
|
ASSERT_FALSE(c2.HasCleanups());
|
||||||
|
|
||||||
|
SharedCleanablePtr scp1;
|
||||||
|
ASSERT_EQ(scp1.get(), nullptr);
|
||||||
|
|
||||||
|
// No-ops
|
||||||
|
scp1.RegisterCopyWith(&c2);
|
||||||
|
scp1.MoveAsCleanupTo(&c2);
|
||||||
|
|
||||||
|
ASSERT_FALSE(c2.HasCleanups());
|
||||||
|
c2.RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
c2.RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
c2.RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
|
||||||
|
scp1.Allocate();
|
||||||
|
ASSERT_NE(scp1.get(), nullptr);
|
||||||
|
ASSERT_FALSE(scp1->HasCleanups());
|
||||||
|
|
||||||
|
// Copy ctor (alias scp2 = scp1)
|
||||||
|
SharedCleanablePtr scp2{scp1};
|
||||||
|
ASSERT_EQ(scp1.get(), scp2.get());
|
||||||
|
|
||||||
|
c1.DelegateCleanupsTo(&*scp1);
|
||||||
|
ASSERT_TRUE(scp1->HasCleanups());
|
||||||
|
ASSERT_TRUE(scp2->HasCleanups());
|
||||||
|
ASSERT_FALSE(c1.HasCleanups());
|
||||||
|
|
||||||
|
SharedCleanablePtr scp3;
|
||||||
|
ASSERT_EQ(scp3.get(), nullptr);
|
||||||
|
|
||||||
|
// Copy operator (alias scp3 = scp2 = scp1)
|
||||||
|
scp3 = scp2;
|
||||||
|
|
||||||
|
// Make scp2 point elsewhere
|
||||||
|
scp2.Allocate();
|
||||||
|
c2.DelegateCleanupsTo(&*scp2);
|
||||||
|
|
||||||
|
ASSERT_EQ(val, 5);
|
||||||
|
// Move operator, invoke old c2 cleanups
|
||||||
|
scp2 = std::move(scp1);
|
||||||
|
ASSERT_EQ(val, 2);
|
||||||
|
MarkInitializedForClangAnalyze(scp1);
|
||||||
|
ASSERT_EQ(scp1.get(), nullptr);
|
||||||
|
|
||||||
|
// Move ctor
|
||||||
|
{
|
||||||
|
SharedCleanablePtr scp4{std::move(scp3)};
|
||||||
|
MarkInitializedForClangAnalyze(scp3);
|
||||||
|
ASSERT_EQ(scp3.get(), nullptr);
|
||||||
|
ASSERT_EQ(scp4.get(), scp2.get());
|
||||||
|
|
||||||
|
scp2.Reset();
|
||||||
|
ASSERT_EQ(val, 2);
|
||||||
|
// invoke old c1 cleanups
|
||||||
|
}
|
||||||
|
ASSERT_EQ(val, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(CleanableTest, CleanableWrapShared) {
|
||||||
|
int val = 5;
|
||||||
|
SharedCleanablePtr scp1, scp2;
|
||||||
|
scp1.Allocate();
|
||||||
|
scp1->RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
scp1->RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
|
||||||
|
scp2.Allocate();
|
||||||
|
scp2->RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
scp2->RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
scp2->RegisterCleanup(&Decrement, &val, nullptr);
|
||||||
|
|
||||||
|
{
|
||||||
|
Cleanable c1;
|
||||||
|
{
|
||||||
|
Cleanable c2, c3;
|
||||||
|
scp1.RegisterCopyWith(&c1);
|
||||||
|
scp1.MoveAsCleanupTo(&c2);
|
||||||
|
ASSERT_TRUE(c1.HasCleanups());
|
||||||
|
ASSERT_TRUE(c2.HasCleanups());
|
||||||
|
ASSERT_EQ(scp1.get(), nullptr);
|
||||||
|
scp2.MoveAsCleanupTo(&c3);
|
||||||
|
ASSERT_TRUE(c3.HasCleanups());
|
||||||
|
ASSERT_EQ(scp2.get(), nullptr);
|
||||||
|
c2.Reset();
|
||||||
|
ASSERT_FALSE(c2.HasCleanups());
|
||||||
|
ASSERT_EQ(val, 5);
|
||||||
|
// invoke cleanups from scp2
|
||||||
|
}
|
||||||
|
ASSERT_EQ(val, 2);
|
||||||
|
// invoke cleanups from scp1
|
||||||
|
}
|
||||||
|
ASSERT_EQ(val, 0);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
|
|
@ -8,93 +8,13 @@
|
||||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
#include "rocksdb/iterator.h"
|
#include "rocksdb/iterator.h"
|
||||||
|
|
||||||
#include "memory/arena.h"
|
#include "memory/arena.h"
|
||||||
#include "table/internal_iterator.h"
|
#include "table/internal_iterator.h"
|
||||||
#include "table/iterator_wrapper.h"
|
#include "table/iterator_wrapper.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
Cleanable::Cleanable() {
|
|
||||||
cleanup_.function = nullptr;
|
|
||||||
cleanup_.next = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
Cleanable::~Cleanable() { DoCleanup(); }
|
|
||||||
|
|
||||||
Cleanable::Cleanable(Cleanable&& other) {
|
|
||||||
*this = std::move(other);
|
|
||||||
}
|
|
||||||
|
|
||||||
Cleanable& Cleanable::operator=(Cleanable&& other) {
|
|
||||||
if (this != &other) {
|
|
||||||
cleanup_ = other.cleanup_;
|
|
||||||
other.cleanup_.function = nullptr;
|
|
||||||
other.cleanup_.next = nullptr;
|
|
||||||
}
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the entire linked list was on heap we could have simply add attach one
|
|
||||||
// link list to another. However the head is an embeded object to avoid the cost
|
|
||||||
// of creating objects for most of the use cases when the Cleanable has only one
|
|
||||||
// Cleanup to do. We could put evernything on heap if benchmarks show no
|
|
||||||
// negative impact on performance.
|
|
||||||
// Also we need to iterate on the linked list since there is no pointer to the
|
|
||||||
// tail. We can add the tail pointer but maintainin it might negatively impact
|
|
||||||
// the perforamnce for the common case of one cleanup where tail pointer is not
|
|
||||||
// needed. Again benchmarks could clarify that.
|
|
||||||
// Even without a tail pointer we could iterate on the list, find the tail, and
|
|
||||||
// have only that node updated without the need to insert the Cleanups one by
|
|
||||||
// one. This however would be redundant when the source Cleanable has one or a
|
|
||||||
// few Cleanups which is the case most of the time.
|
|
||||||
// TODO(myabandeh): if the list is too long we should maintain a tail pointer
|
|
||||||
// and have the entire list (minus the head that has to be inserted separately)
|
|
||||||
// merged with the target linked list at once.
|
|
||||||
void Cleanable::DelegateCleanupsTo(Cleanable* other) {
|
|
||||||
assert(other != nullptr);
|
|
||||||
if (cleanup_.function == nullptr) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Cleanup* c = &cleanup_;
|
|
||||||
other->RegisterCleanup(c->function, c->arg1, c->arg2);
|
|
||||||
c = c->next;
|
|
||||||
while (c != nullptr) {
|
|
||||||
Cleanup* next = c->next;
|
|
||||||
other->RegisterCleanup(c);
|
|
||||||
c = next;
|
|
||||||
}
|
|
||||||
cleanup_.function = nullptr;
|
|
||||||
cleanup_.next = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Cleanable::RegisterCleanup(Cleanable::Cleanup* c) {
|
|
||||||
assert(c != nullptr);
|
|
||||||
if (cleanup_.function == nullptr) {
|
|
||||||
cleanup_.function = c->function;
|
|
||||||
cleanup_.arg1 = c->arg1;
|
|
||||||
cleanup_.arg2 = c->arg2;
|
|
||||||
delete c;
|
|
||||||
} else {
|
|
||||||
c->next = cleanup_.next;
|
|
||||||
cleanup_.next = c;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void Cleanable::RegisterCleanup(CleanupFunction func, void* arg1, void* arg2) {
|
|
||||||
assert(func != nullptr);
|
|
||||||
Cleanup* c;
|
|
||||||
if (cleanup_.function == nullptr) {
|
|
||||||
c = &cleanup_;
|
|
||||||
} else {
|
|
||||||
c = new Cleanup;
|
|
||||||
c->next = cleanup_.next;
|
|
||||||
cleanup_.next = c;
|
|
||||||
}
|
|
||||||
c->function = func;
|
|
||||||
c->arg1 = arg1;
|
|
||||||
c->arg2 = arg2;
|
|
||||||
}
|
|
||||||
|
|
||||||
Status Iterator::GetProperty(std::string prop_name, std::string* prop) {
|
Status Iterator::GetProperty(std::string prop_name, std::string* prop) {
|
||||||
if (prop == nullptr) {
|
if (prop == nullptr) {
|
||||||
return Status::InvalidArgument("prop is nullptr");
|
return Status::InvalidArgument("prop is nullptr");
|
||||||
|
|
|
@ -97,7 +97,10 @@ class MultiGetContext {
|
||||||
// that need to be performed
|
// that need to be performed
|
||||||
static const int MAX_BATCH_SIZE = 32;
|
static const int MAX_BATCH_SIZE = 32;
|
||||||
|
|
||||||
static_assert(MAX_BATCH_SIZE < 64, "MAX_BATCH_SIZE cannot exceed 63");
|
// A bitmask of at least MAX_BATCH_SIZE - 1 bits, so that
|
||||||
|
// Mask{1} << MAX_BATCH_SIZE is well defined
|
||||||
|
using Mask = uint64_t;
|
||||||
|
static_assert(MAX_BATCH_SIZE < sizeof(Mask) * 8);
|
||||||
|
|
||||||
MultiGetContext(autovector<KeyContext*, MAX_BATCH_SIZE>* sorted_keys,
|
MultiGetContext(autovector<KeyContext*, MAX_BATCH_SIZE>* sorted_keys,
|
||||||
size_t begin, size_t num_keys, SequenceNumber snapshot,
|
size_t begin, size_t num_keys, SequenceNumber snapshot,
|
||||||
|
@ -138,7 +141,7 @@ class MultiGetContext {
|
||||||
char lookup_key_stack_buf[sizeof(LookupKey) * MAX_LOOKUP_KEYS_ON_STACK];
|
char lookup_key_stack_buf[sizeof(LookupKey) * MAX_LOOKUP_KEYS_ON_STACK];
|
||||||
std::array<KeyContext*, MAX_BATCH_SIZE> sorted_keys_;
|
std::array<KeyContext*, MAX_BATCH_SIZE> sorted_keys_;
|
||||||
size_t num_keys_;
|
size_t num_keys_;
|
||||||
uint64_t value_mask_;
|
Mask value_mask_;
|
||||||
uint64_t value_size_;
|
uint64_t value_size_;
|
||||||
std::unique_ptr<char[]> lookup_key_heap_buf;
|
std::unique_ptr<char[]> lookup_key_heap_buf;
|
||||||
LookupKey* lookup_key_ptr_;
|
LookupKey* lookup_key_ptr_;
|
||||||
|
@ -171,7 +174,7 @@ class MultiGetContext {
|
||||||
Iterator(const Range* range, size_t idx)
|
Iterator(const Range* range, size_t idx)
|
||||||
: range_(range), ctx_(range->ctx_), index_(idx) {
|
: range_(range), ctx_(range->ctx_), index_(idx) {
|
||||||
while (index_ < range_->end_ &&
|
while (index_ < range_->end_ &&
|
||||||
(uint64_t{1} << index_) &
|
(Mask{1} << index_) &
|
||||||
(range_->ctx_->value_mask_ | range_->skip_mask_))
|
(range_->ctx_->value_mask_ | range_->skip_mask_))
|
||||||
index_++;
|
index_++;
|
||||||
}
|
}
|
||||||
|
@ -181,7 +184,7 @@ class MultiGetContext {
|
||||||
|
|
||||||
Iterator& operator++() {
|
Iterator& operator++() {
|
||||||
while (++index_ < range_->end_ &&
|
while (++index_ < range_->end_ &&
|
||||||
(uint64_t{1} << index_) &
|
(Mask{1} << index_) &
|
||||||
(range_->ctx_->value_mask_ | range_->skip_mask_))
|
(range_->ctx_->value_mask_ | range_->skip_mask_))
|
||||||
;
|
;
|
||||||
return *this;
|
return *this;
|
||||||
|
@ -235,22 +238,22 @@ class MultiGetContext {
|
||||||
|
|
||||||
bool empty() const { return RemainingMask() == 0; }
|
bool empty() const { return RemainingMask() == 0; }
|
||||||
|
|
||||||
void SkipIndex(size_t index) { skip_mask_ |= uint64_t{1} << index; }
|
void SkipIndex(size_t index) { skip_mask_ |= Mask{1} << index; }
|
||||||
|
|
||||||
void SkipKey(const Iterator& iter) { SkipIndex(iter.index_); }
|
void SkipKey(const Iterator& iter) { SkipIndex(iter.index_); }
|
||||||
|
|
||||||
bool IsKeySkipped(const Iterator& iter) const {
|
bool IsKeySkipped(const Iterator& iter) const {
|
||||||
return skip_mask_ & (uint64_t{1} << iter.index_);
|
return skip_mask_ & (Mask{1} << iter.index_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the value_mask_ in MultiGetContext so its
|
// Update the value_mask_ in MultiGetContext so its
|
||||||
// immediately reflected in all the Range Iterators
|
// immediately reflected in all the Range Iterators
|
||||||
void MarkKeyDone(Iterator& iter) {
|
void MarkKeyDone(Iterator& iter) {
|
||||||
ctx_->value_mask_ |= (uint64_t{1} << iter.index_);
|
ctx_->value_mask_ |= (Mask{1} << iter.index_);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CheckKeyDone(Iterator& iter) const {
|
bool CheckKeyDone(Iterator& iter) const {
|
||||||
return ctx_->value_mask_ & (uint64_t{1} << iter.index_);
|
return ctx_->value_mask_ & (Mask{1} << iter.index_);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t KeysLeft() const { return BitsSetToOne(RemainingMask()); }
|
uint64_t KeysLeft() const { return BitsSetToOne(RemainingMask()); }
|
||||||
|
@ -269,15 +272,15 @@ class MultiGetContext {
|
||||||
MultiGetContext* ctx_;
|
MultiGetContext* ctx_;
|
||||||
size_t start_;
|
size_t start_;
|
||||||
size_t end_;
|
size_t end_;
|
||||||
uint64_t skip_mask_;
|
Mask skip_mask_;
|
||||||
|
|
||||||
Range(MultiGetContext* ctx, size_t num_keys)
|
Range(MultiGetContext* ctx, size_t num_keys)
|
||||||
: ctx_(ctx), start_(0), end_(num_keys), skip_mask_(0) {
|
: ctx_(ctx), start_(0), end_(num_keys), skip_mask_(0) {
|
||||||
assert(num_keys < 64);
|
assert(num_keys < 64);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t RemainingMask() const {
|
Mask RemainingMask() const {
|
||||||
return (((uint64_t{1} << end_) - 1) & ~((uint64_t{1} << start_) - 1) &
|
return (((Mask{1} << end_) - 1) & ~((Mask{1} << start_) - 1) &
|
||||||
~(ctx_->value_mask_ | skip_mask_));
|
~(ctx_->value_mask_ | skip_mask_));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
//
|
||||||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#include "rocksdb/cleanable.h"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <cassert>
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
|
Cleanable::Cleanable() {
|
||||||
|
cleanup_.function = nullptr;
|
||||||
|
cleanup_.next = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
Cleanable::~Cleanable() { DoCleanup(); }
|
||||||
|
|
||||||
|
Cleanable::Cleanable(Cleanable&& other) noexcept { *this = std::move(other); }
|
||||||
|
|
||||||
|
Cleanable& Cleanable::operator=(Cleanable&& other) noexcept {
|
||||||
|
assert(this != &other); // https://stackoverflow.com/a/9322542/454544
|
||||||
|
cleanup_ = other.cleanup_;
|
||||||
|
other.cleanup_.function = nullptr;
|
||||||
|
other.cleanup_.next = nullptr;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the entire linked list was on heap we could have simply add attach one
|
||||||
|
// link list to another. However the head is an embeded object to avoid the cost
|
||||||
|
// of creating objects for most of the use cases when the Cleanable has only one
|
||||||
|
// Cleanup to do. We could put evernything on heap if benchmarks show no
|
||||||
|
// negative impact on performance.
|
||||||
|
// Also we need to iterate on the linked list since there is no pointer to the
|
||||||
|
// tail. We can add the tail pointer but maintainin it might negatively impact
|
||||||
|
// the perforamnce for the common case of one cleanup where tail pointer is not
|
||||||
|
// needed. Again benchmarks could clarify that.
|
||||||
|
// Even without a tail pointer we could iterate on the list, find the tail, and
|
||||||
|
// have only that node updated without the need to insert the Cleanups one by
|
||||||
|
// one. This however would be redundant when the source Cleanable has one or a
|
||||||
|
// few Cleanups which is the case most of the time.
|
||||||
|
// TODO(myabandeh): if the list is too long we should maintain a tail pointer
|
||||||
|
// and have the entire list (minus the head that has to be inserted separately)
|
||||||
|
// merged with the target linked list at once.
|
||||||
|
void Cleanable::DelegateCleanupsTo(Cleanable* other) {
|
||||||
|
assert(other != nullptr);
|
||||||
|
if (cleanup_.function == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Cleanup* c = &cleanup_;
|
||||||
|
other->RegisterCleanup(c->function, c->arg1, c->arg2);
|
||||||
|
c = c->next;
|
||||||
|
while (c != nullptr) {
|
||||||
|
Cleanup* next = c->next;
|
||||||
|
other->RegisterCleanup(c);
|
||||||
|
c = next;
|
||||||
|
}
|
||||||
|
cleanup_.function = nullptr;
|
||||||
|
cleanup_.next = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Cleanable::RegisterCleanup(Cleanable::Cleanup* c) {
|
||||||
|
assert(c != nullptr);
|
||||||
|
if (cleanup_.function == nullptr) {
|
||||||
|
cleanup_.function = c->function;
|
||||||
|
cleanup_.arg1 = c->arg1;
|
||||||
|
cleanup_.arg2 = c->arg2;
|
||||||
|
delete c;
|
||||||
|
} else {
|
||||||
|
c->next = cleanup_.next;
|
||||||
|
cleanup_.next = c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Cleanable::RegisterCleanup(CleanupFunction func, void* arg1, void* arg2) {
|
||||||
|
assert(func != nullptr);
|
||||||
|
Cleanup* c;
|
||||||
|
if (cleanup_.function == nullptr) {
|
||||||
|
c = &cleanup_;
|
||||||
|
} else {
|
||||||
|
c = new Cleanup;
|
||||||
|
c->next = cleanup_.next;
|
||||||
|
cleanup_.next = c;
|
||||||
|
}
|
||||||
|
c->function = func;
|
||||||
|
c->arg1 = arg1;
|
||||||
|
c->arg2 = arg2;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SharedCleanablePtr::Impl : public Cleanable {
|
||||||
|
std::atomic<unsigned> ref_count{1}; // Start with 1 ref
|
||||||
|
void Ref() { ref_count.fetch_add(1, std::memory_order_relaxed); }
|
||||||
|
void Unref() {
|
||||||
|
if (ref_count.fetch_sub(1, std::memory_order_relaxed) == 1) {
|
||||||
|
// Last ref
|
||||||
|
delete this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
static void UnrefWrapper(void* arg1, void* /*arg2*/) {
|
||||||
|
static_cast<SharedCleanablePtr::Impl*>(arg1)->Unref();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void SharedCleanablePtr::Reset() {
|
||||||
|
if (ptr_) {
|
||||||
|
ptr_->Unref();
|
||||||
|
ptr_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SharedCleanablePtr::Allocate() {
|
||||||
|
Reset();
|
||||||
|
ptr_ = new Impl();
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedCleanablePtr::SharedCleanablePtr(const SharedCleanablePtr& from) {
|
||||||
|
*this = from;
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedCleanablePtr::SharedCleanablePtr(SharedCleanablePtr&& from) noexcept {
|
||||||
|
*this = std::move(from);
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedCleanablePtr& SharedCleanablePtr::operator=(
|
||||||
|
const SharedCleanablePtr& from) {
|
||||||
|
if (this != &from) {
|
||||||
|
Reset();
|
||||||
|
ptr_ = from.ptr_;
|
||||||
|
if (ptr_) {
|
||||||
|
ptr_->Ref();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedCleanablePtr& SharedCleanablePtr::operator=(
|
||||||
|
SharedCleanablePtr&& from) noexcept {
|
||||||
|
assert(this != &from); // https://stackoverflow.com/a/9322542/454544
|
||||||
|
Reset();
|
||||||
|
ptr_ = from.ptr_;
|
||||||
|
from.ptr_ = nullptr;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedCleanablePtr::~SharedCleanablePtr() { Reset(); }
|
||||||
|
|
||||||
|
Cleanable& SharedCleanablePtr::operator*() {
|
||||||
|
return *ptr_; // implicit upcast
|
||||||
|
}
|
||||||
|
|
||||||
|
Cleanable* SharedCleanablePtr::operator->() {
|
||||||
|
return ptr_; // implicit upcast
|
||||||
|
}
|
||||||
|
|
||||||
|
Cleanable* SharedCleanablePtr::get() {
|
||||||
|
return ptr_; // implicit upcast
|
||||||
|
}
|
||||||
|
|
||||||
|
void SharedCleanablePtr::RegisterCopyWith(Cleanable* target) {
|
||||||
|
if (ptr_) {
|
||||||
|
// "Virtual" copy of the pointer
|
||||||
|
ptr_->Ref();
|
||||||
|
target->RegisterCleanup(&Impl::UnrefWrapper, ptr_, nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SharedCleanablePtr::MoveAsCleanupTo(Cleanable* target) {
|
||||||
|
if (ptr_) {
|
||||||
|
// "Virtual" move of the pointer
|
||||||
|
target->RegisterCleanup(&Impl::UnrefWrapper, ptr_, nullptr);
|
||||||
|
ptr_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
Loading…
Reference in New Issue