mirror of https://github.com/facebook/rocksdb.git
Minor improvement to CacheReservationManager/WriteBufferManager/CompressionDictBuilding (#9139)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9139 Reviewed By: zhichao-cao Differential Revision: D32211415 Pulled By: hx235 fbshipit-source-id: 39ce036ba34e1fb4a1992a33ac6904a4a943301d
This commit is contained in:
parent
5237b39d2e
commit
3018a3e27e
|
@ -22,16 +22,15 @@
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
class CacheReservationManagerTest : public ::testing::Test {
|
class CacheReservationManagerTest : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024;
|
|
||||||
static constexpr int kNumShardBits = 0; // 2^0 shard
|
|
||||||
|
|
||||||
static constexpr std::size_t kSizeDummyEntry =
|
static constexpr std::size_t kSizeDummyEntry =
|
||||||
CacheReservationManager::GetDummyEntrySize();
|
CacheReservationManager::GetDummyEntrySize();
|
||||||
|
static constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry;
|
||||||
|
static constexpr int kNumShardBits = 0; // 2^0 shard
|
||||||
static const std::size_t kCacheKeyPrefixSize =
|
static const std::size_t kCacheKeyPrefixSize =
|
||||||
BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length;
|
BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length;
|
||||||
static constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
static constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
||||||
|
|
||||||
std::shared_ptr<Cache> cache = NewLRUCache(kOneGigabyte, kNumShardBits);
|
std::shared_ptr<Cache> cache = NewLRUCache(kCacheCapacity, kNumShardBits);
|
||||||
std::unique_ptr<CacheReservationManager> test_cache_rev_mng;
|
std::unique_ptr<CacheReservationManager> test_cache_rev_mng;
|
||||||
|
|
||||||
CacheReservationManagerTest() {
|
CacheReservationManagerTest() {
|
||||||
|
@ -142,21 +141,22 @@ TEST_F(CacheReservationManagerTest,
|
||||||
|
|
||||||
TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
|
TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
|
||||||
IncreaseCacheReservationOnFullCache) {
|
IncreaseCacheReservationOnFullCache) {
|
||||||
constexpr std::size_t kOneMegabyte = 1024 * 1024;
|
;
|
||||||
constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024;
|
|
||||||
constexpr std::size_t kSizeDummyEntry =
|
constexpr std::size_t kSizeDummyEntry =
|
||||||
CacheReservationManager::GetDummyEntrySize();
|
CacheReservationManager::GetDummyEntrySize();
|
||||||
|
constexpr std::size_t kSmallCacheCapacity = 4 * kSizeDummyEntry;
|
||||||
|
constexpr std::size_t kBigCacheCapacity = 4096 * kSizeDummyEntry;
|
||||||
constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
||||||
|
|
||||||
LRUCacheOptions lo;
|
LRUCacheOptions lo;
|
||||||
lo.capacity = kOneMegabyte;
|
lo.capacity = kSmallCacheCapacity;
|
||||||
lo.num_shard_bits = 0; // 2^0 shard
|
lo.num_shard_bits = 0; // 2^0 shard
|
||||||
lo.strict_capacity_limit = true;
|
lo.strict_capacity_limit = true;
|
||||||
std::shared_ptr<Cache> cache = NewLRUCache(lo);
|
std::shared_ptr<Cache> cache = NewLRUCache(lo);
|
||||||
std::unique_ptr<CacheReservationManager> test_cache_rev_mng(
|
std::unique_ptr<CacheReservationManager> test_cache_rev_mng(
|
||||||
new CacheReservationManager(cache));
|
new CacheReservationManager(cache));
|
||||||
|
|
||||||
std::size_t new_mem_used = kOneMegabyte + 1;
|
std::size_t new_mem_used = kSmallCacheCapacity + 1;
|
||||||
Status s =
|
Status s =
|
||||||
test_cache_rev_mng
|
test_cache_rev_mng
|
||||||
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
||||||
|
@ -168,18 +168,19 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
|
||||||
1 * kSizeDummyEntry)
|
1 * kSizeDummyEntry)
|
||||||
<< "Failed to bookkeep correctly before cache resevation failure happens "
|
<< "Failed to bookkeep correctly before cache resevation failure happens "
|
||||||
"due to full cache";
|
"due to full cache";
|
||||||
EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(), kOneMegabyte)
|
EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(),
|
||||||
|
kSmallCacheCapacity)
|
||||||
<< "Failed to bookkeep correctly (i.e, bookkeep only successful dummy "
|
<< "Failed to bookkeep correctly (i.e, bookkeep only successful dummy "
|
||||||
"entry insertions) when encountering cache resevation failure due to "
|
"entry insertions) when encountering cache resevation failure due to "
|
||||||
"full cache";
|
"full cache";
|
||||||
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry)
|
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry)
|
||||||
<< "Failed to insert underlying dummy entries correctly when "
|
<< "Failed to insert underlying dummy entries correctly when "
|
||||||
"encountering cache resevation failure due to full cache";
|
"encountering cache resevation failure due to full cache";
|
||||||
EXPECT_LE(cache->GetPinnedUsage(), kOneMegabyte)
|
EXPECT_LE(cache->GetPinnedUsage(), kSmallCacheCapacity)
|
||||||
<< "Failed to insert underlying dummy entries correctly when "
|
<< "Failed to insert underlying dummy entries correctly when "
|
||||||
"encountering cache resevation failure due to full cache";
|
"encountering cache resevation failure due to full cache";
|
||||||
|
|
||||||
new_mem_used = kOneMegabyte / 2; // 2 dummy entries
|
new_mem_used = kSmallCacheCapacity / 2; // 2 dummy entries
|
||||||
s = test_cache_rev_mng
|
s = test_cache_rev_mng
|
||||||
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
||||||
new_mem_used);
|
new_mem_used);
|
||||||
|
@ -201,7 +202,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
|
||||||
"to full cache";
|
"to full cache";
|
||||||
|
|
||||||
// Create cache full again for subsequent tests
|
// Create cache full again for subsequent tests
|
||||||
new_mem_used = kOneMegabyte + 1;
|
new_mem_used = kSmallCacheCapacity + 1;
|
||||||
s = test_cache_rev_mng
|
s = test_cache_rev_mng
|
||||||
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
||||||
new_mem_used);
|
new_mem_used);
|
||||||
|
@ -212,21 +213,22 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
|
||||||
1 * kSizeDummyEntry)
|
1 * kSizeDummyEntry)
|
||||||
<< "Failed to bookkeep correctly before cache resevation failure happens "
|
<< "Failed to bookkeep correctly before cache resevation failure happens "
|
||||||
"due to full cache";
|
"due to full cache";
|
||||||
EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(), kOneMegabyte)
|
EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(),
|
||||||
|
kSmallCacheCapacity)
|
||||||
<< "Failed to bookkeep correctly (i.e, bookkeep only successful dummy "
|
<< "Failed to bookkeep correctly (i.e, bookkeep only successful dummy "
|
||||||
"entry insertions) when encountering cache resevation failure due to "
|
"entry insertions) when encountering cache resevation failure due to "
|
||||||
"full cache";
|
"full cache";
|
||||||
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry)
|
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry)
|
||||||
<< "Failed to insert underlying dummy entries correctly when "
|
<< "Failed to insert underlying dummy entries correctly when "
|
||||||
"encountering cache resevation failure due to full cache";
|
"encountering cache resevation failure due to full cache";
|
||||||
EXPECT_LE(cache->GetPinnedUsage(), kOneMegabyte)
|
EXPECT_LE(cache->GetPinnedUsage(), kSmallCacheCapacity)
|
||||||
<< "Failed to insert underlying dummy entries correctly when "
|
<< "Failed to insert underlying dummy entries correctly when "
|
||||||
"encountering cache resevation failure due to full cache";
|
"encountering cache resevation failure due to full cache";
|
||||||
|
|
||||||
// Increase cache capacity so the previously failed insertion can fully
|
// Increase cache capacity so the previously failed insertion can fully
|
||||||
// succeed
|
// succeed
|
||||||
cache->SetCapacity(kOneGigabyte);
|
cache->SetCapacity(kBigCacheCapacity);
|
||||||
new_mem_used = kOneMegabyte + 1;
|
new_mem_used = kSmallCacheCapacity + 1;
|
||||||
s = test_cache_rev_mng
|
s = test_cache_rev_mng
|
||||||
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
|
||||||
new_mem_used);
|
new_mem_used);
|
||||||
|
@ -308,13 +310,13 @@ TEST_F(CacheReservationManagerTest,
|
||||||
|
|
||||||
TEST(CacheReservationManagerWithDelayedDecreaseTest,
|
TEST(CacheReservationManagerWithDelayedDecreaseTest,
|
||||||
DecreaseCacheReservationWithDelayedDecrease) {
|
DecreaseCacheReservationWithDelayedDecrease) {
|
||||||
constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024;
|
|
||||||
constexpr std::size_t kSizeDummyEntry =
|
constexpr std::size_t kSizeDummyEntry =
|
||||||
CacheReservationManager::GetDummyEntrySize();
|
CacheReservationManager::GetDummyEntrySize();
|
||||||
|
constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry;
|
||||||
constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
||||||
|
|
||||||
LRUCacheOptions lo;
|
LRUCacheOptions lo;
|
||||||
lo.capacity = kOneGigabyte;
|
lo.capacity = kCacheCapacity;
|
||||||
lo.num_shard_bits = 0;
|
lo.num_shard_bits = 0;
|
||||||
std::shared_ptr<Cache> cache = NewLRUCache(lo);
|
std::shared_ptr<Cache> cache = NewLRUCache(lo);
|
||||||
std::unique_ptr<CacheReservationManager> test_cache_rev_mng(
|
std::unique_ptr<CacheReservationManager> test_cache_rev_mng(
|
||||||
|
@ -381,13 +383,13 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest,
|
||||||
|
|
||||||
TEST(CacheReservationManagerDestructorTest,
|
TEST(CacheReservationManagerDestructorTest,
|
||||||
ReleaseRemainingDummyEntriesOnDestruction) {
|
ReleaseRemainingDummyEntriesOnDestruction) {
|
||||||
constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024;
|
|
||||||
constexpr std::size_t kSizeDummyEntry =
|
constexpr std::size_t kSizeDummyEntry =
|
||||||
CacheReservationManager::GetDummyEntrySize();
|
CacheReservationManager::GetDummyEntrySize();
|
||||||
|
constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry;
|
||||||
constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
constexpr std::size_t kMetaDataChargeOverhead = 10000;
|
||||||
|
|
||||||
LRUCacheOptions lo;
|
LRUCacheOptions lo;
|
||||||
lo.capacity = kOneGigabyte;
|
lo.capacity = kCacheCapacity;
|
||||||
lo.num_shard_bits = 0;
|
lo.num_shard_bits = 0;
|
||||||
std::shared_ptr<Cache> cache = NewLRUCache(lo);
|
std::shared_ptr<Cache> cache = NewLRUCache(lo);
|
||||||
{
|
{
|
||||||
|
|
|
@ -61,7 +61,7 @@ class WriteBufferManager final {
|
||||||
bool enabled() const { return buffer_size() > 0; }
|
bool enabled() const { return buffer_size() > 0; }
|
||||||
|
|
||||||
// Returns true if pointer to cache is passed.
|
// Returns true if pointer to cache is passed.
|
||||||
bool cost_to_cache() const { return cache_rev_mng_ != nullptr; }
|
bool cost_to_cache() const { return cache_res_mgr_ != nullptr; }
|
||||||
|
|
||||||
// Returns the total memory used by memtables.
|
// Returns the total memory used by memtables.
|
||||||
// Only valid if enabled()
|
// Only valid if enabled()
|
||||||
|
@ -158,9 +158,9 @@ class WriteBufferManager final {
|
||||||
std::atomic<size_t> memory_used_;
|
std::atomic<size_t> memory_used_;
|
||||||
// Memory that hasn't been scheduled to free.
|
// Memory that hasn't been scheduled to free.
|
||||||
std::atomic<size_t> memory_active_;
|
std::atomic<size_t> memory_active_;
|
||||||
std::unique_ptr<CacheReservationManager> cache_rev_mng_;
|
std::unique_ptr<CacheReservationManager> cache_res_mgr_;
|
||||||
// Protects cache_rev_mng_
|
// Protects cache_res_mgr_
|
||||||
std::mutex cache_rev_mng_mu_;
|
std::mutex cache_res_mgr_mu_;
|
||||||
|
|
||||||
std::list<StallInterface*> queue_;
|
std::list<StallInterface*> queue_;
|
||||||
// Protects the queue_ and stall_active_.
|
// Protects the queue_ and stall_active_.
|
||||||
|
|
|
@ -23,7 +23,7 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
|
||||||
mutable_limit_(buffer_size_ * 7 / 8),
|
mutable_limit_(buffer_size_ * 7 / 8),
|
||||||
memory_used_(0),
|
memory_used_(0),
|
||||||
memory_active_(0),
|
memory_active_(0),
|
||||||
cache_rev_mng_(nullptr),
|
cache_res_mgr_(nullptr),
|
||||||
allow_stall_(allow_stall),
|
allow_stall_(allow_stall),
|
||||||
stall_active_(false) {
|
stall_active_(false) {
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
|
@ -31,7 +31,7 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
|
||||||
// Memtable's memory usage tends to fluctuate frequently
|
// Memtable's memory usage tends to fluctuate frequently
|
||||||
// therefore we set delayed_decrease = true to save some dummy entry
|
// therefore we set delayed_decrease = true to save some dummy entry
|
||||||
// insertion on memory increase right after memory decrease
|
// insertion on memory increase right after memory decrease
|
||||||
cache_rev_mng_.reset(
|
cache_res_mgr_.reset(
|
||||||
new CacheReservationManager(cache, true /* delayed_decrease */));
|
new CacheReservationManager(cache, true /* delayed_decrease */));
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
@ -47,15 +47,15 @@ WriteBufferManager::~WriteBufferManager() {
|
||||||
}
|
}
|
||||||
|
|
||||||
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
|
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
|
||||||
if (cache_rev_mng_ != nullptr) {
|
if (cache_res_mgr_ != nullptr) {
|
||||||
return cache_rev_mng_->GetTotalReservedCacheSize();
|
return cache_res_mgr_->GetTotalReservedCacheSize();
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBufferManager::ReserveMem(size_t mem) {
|
void WriteBufferManager::ReserveMem(size_t mem) {
|
||||||
if (cache_rev_mng_ != nullptr) {
|
if (cache_res_mgr_ != nullptr) {
|
||||||
ReserveMemWithCache(mem);
|
ReserveMemWithCache(mem);
|
||||||
} else if (enabled()) {
|
} else if (enabled()) {
|
||||||
memory_used_.fetch_add(mem, std::memory_order_relaxed);
|
memory_used_.fetch_add(mem, std::memory_order_relaxed);
|
||||||
|
@ -68,15 +68,15 @@ void WriteBufferManager::ReserveMem(size_t mem) {
|
||||||
// Should only be called from write thread
|
// Should only be called from write thread
|
||||||
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
|
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
assert(cache_rev_mng_ != nullptr);
|
assert(cache_res_mgr_ != nullptr);
|
||||||
// Use a mutex to protect various data structures. Can be optimized to a
|
// Use a mutex to protect various data structures. Can be optimized to a
|
||||||
// lock-free solution if it ends up with a performance bottleneck.
|
// lock-free solution if it ends up with a performance bottleneck.
|
||||||
std::lock_guard<std::mutex> lock(cache_rev_mng_mu_);
|
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
|
||||||
|
|
||||||
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
|
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
|
||||||
memory_used_.store(new_mem_used, std::memory_order_relaxed);
|
memory_used_.store(new_mem_used, std::memory_order_relaxed);
|
||||||
Status s =
|
Status s =
|
||||||
cache_rev_mng_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
|
cache_res_mgr_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
|
||||||
new_mem_used);
|
new_mem_used);
|
||||||
|
|
||||||
// We absorb the error since WriteBufferManager is not able to handle
|
// We absorb the error since WriteBufferManager is not able to handle
|
||||||
|
@ -97,7 +97,7 @@ void WriteBufferManager::ScheduleFreeMem(size_t mem) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBufferManager::FreeMem(size_t mem) {
|
void WriteBufferManager::FreeMem(size_t mem) {
|
||||||
if (cache_rev_mng_ != nullptr) {
|
if (cache_res_mgr_ != nullptr) {
|
||||||
FreeMemWithCache(mem);
|
FreeMemWithCache(mem);
|
||||||
} else if (enabled()) {
|
} else if (enabled()) {
|
||||||
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
|
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
|
||||||
|
@ -108,14 +108,14 @@ void WriteBufferManager::FreeMem(size_t mem) {
|
||||||
|
|
||||||
void WriteBufferManager::FreeMemWithCache(size_t mem) {
|
void WriteBufferManager::FreeMemWithCache(size_t mem) {
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
assert(cache_rev_mng_ != nullptr);
|
assert(cache_res_mgr_ != nullptr);
|
||||||
// Use a mutex to protect various data structures. Can be optimized to a
|
// Use a mutex to protect various data structures. Can be optimized to a
|
||||||
// lock-free solution if it ends up with a performance bottleneck.
|
// lock-free solution if it ends up with a performance bottleneck.
|
||||||
std::lock_guard<std::mutex> lock(cache_rev_mng_mu_);
|
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
|
||||||
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
|
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
|
||||||
memory_used_.store(new_mem_used, std::memory_order_relaxed);
|
memory_used_.store(new_mem_used, std::memory_order_relaxed);
|
||||||
Status s =
|
Status s =
|
||||||
cache_rev_mng_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
|
cache_res_mgr_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
|
||||||
new_mem_used);
|
new_mem_used);
|
||||||
|
|
||||||
// We absorb the error since WriteBufferManager is not able to handle
|
// We absorb the error since WriteBufferManager is not able to handle
|
||||||
|
|
|
@ -1931,6 +1931,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
|
||||||
}
|
}
|
||||||
r->data_block_buffers.clear();
|
r->data_block_buffers.clear();
|
||||||
r->data_begin_offset = 0;
|
r->data_begin_offset = 0;
|
||||||
|
// Release all reserved cache for data block buffers
|
||||||
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
|
||||||
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation<
|
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation<
|
||||||
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
|
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
|
||||||
|
|
Loading…
Reference in New Issue