mirror of https://github.com/facebook/rocksdb.git
`GetAggregatedIntProperty` accumulates property once per block cache (#12755)
Summary: Fix issue https://github.com/facebook/rocksdb/issues/12687. A block cache may be shared by multiple column families. Therefore, when getting the aggregated property of the block cache, we need to deduplicate by instances of the block cache, meaning the same instance should only be counted once. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12755 Reviewed By: jowlyzhang Differential Revision: D58508819 Pulled By: ajkr fbshipit-source-id: 3b746841d7eac59f900387ec3b8c19dbcd20aae4
This commit is contained in:
parent
b8c9a2576a
commit
9f95aa8269
|
@ -4537,8 +4537,11 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
|
|||
if (property_info == nullptr || property_info->handle_int == nullptr) {
|
||||
return false;
|
||||
}
|
||||
auto aggregator = CreateIntPropertyAggregator(property);
|
||||
if (aggregator == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
uint64_t sum = 0;
|
||||
bool ret = true;
|
||||
{
|
||||
// Needs mutex to protect the list of column families.
|
||||
|
@ -4552,14 +4555,14 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
|
|||
// GetIntPropertyInternal may release db mutex and re-acquire it.
|
||||
mutex_.AssertHeld();
|
||||
if (ret) {
|
||||
sum += value;
|
||||
aggregator->Add(cfd, value);
|
||||
} else {
|
||||
ret = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
*aggregated_value = sum;
|
||||
*aggregated_value = aggregator->Aggregate();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -192,6 +192,49 @@ TEST_F(DBPropertiesTest, GetAggregatedIntPropertyTest) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(DBPropertiesTest, AggregateBlockCacheProperty) {
|
||||
constexpr size_t kCapacity = 1000;
|
||||
LRUCacheOptions co;
|
||||
co.capacity = kCapacity;
|
||||
co.num_shard_bits = 0;
|
||||
co.metadata_charge_policy = kDontChargeCacheMetadata;
|
||||
auto block_cache = NewLRUCache(co);
|
||||
|
||||
// All columns families share the same block cache.
|
||||
Options options = CurrentOptions();
|
||||
BlockBasedTableOptions table_opt;
|
||||
table_opt.no_block_cache = false;
|
||||
table_opt.block_cache = block_cache;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_opt));
|
||||
|
||||
CreateAndReopenWithCF({"one", "two", "three", "four"}, options);
|
||||
|
||||
// Insert unpinned block to the cache
|
||||
constexpr size_t kSize1 = 100;
|
||||
ASSERT_OK(block_cache->Insert("block1", nullptr /*value*/,
|
||||
&kNoopCacheItemHelper, kSize1));
|
||||
// Insert pinned block to the cache
|
||||
constexpr size_t kSize2 = 200;
|
||||
Cache::Handle* block2 = nullptr;
|
||||
ASSERT_OK(block_cache->Insert("block2", nullptr /*value*/,
|
||||
&kNoopCacheItemHelper, kSize2, &block2));
|
||||
|
||||
uint64_t value;
|
||||
ASSERT_TRUE(db_->GetAggregatedIntProperty(DB::Properties::kBlockCacheCapacity,
|
||||
&value));
|
||||
ASSERT_EQ(value, kCapacity);
|
||||
|
||||
ASSERT_TRUE(
|
||||
db_->GetAggregatedIntProperty(DB::Properties::kBlockCacheUsage, &value));
|
||||
ASSERT_EQ(value, kSize1 + kSize2);
|
||||
|
||||
ASSERT_TRUE(db_->GetAggregatedIntProperty(
|
||||
DB::Properties::kBlockCachePinnedUsage, &value));
|
||||
ASSERT_EQ(value, kSize2);
|
||||
|
||||
block_cache->Release(block2);
|
||||
}
|
||||
|
||||
namespace {
|
||||
void VerifySimilar(uint64_t a, uint64_t b, double bias) {
|
||||
ASSERT_EQ(a == 0U, b == 0U);
|
||||
|
@ -1083,7 +1126,6 @@ TEST_F(DBPropertiesTest, EstimateCompressionRatio) {
|
|||
ASSERT_GT(CompressionRatioAtLevel(1), 10.0);
|
||||
}
|
||||
|
||||
|
||||
class CountingUserTblPropCollector : public TablePropertiesCollector {
|
||||
public:
|
||||
const char* Name() const override { return "CountingUserTblPropCollector"; }
|
||||
|
@ -2366,7 +2408,6 @@ TEST_F(DBPropertiesTest, TableMetaIndexKeys) {
|
|||
} while (ChangeOptions());
|
||||
}
|
||||
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <limits>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
|
@ -33,7 +34,6 @@
|
|||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
|
||||
const std::map<LevelStatType, LevelStat> InternalStats::compaction_level_stats =
|
||||
{
|
||||
{LevelStatType::NUM_FILES, LevelStat{"NumFiles", "Files"}},
|
||||
|
@ -2135,5 +2135,64 @@ void InternalStats::DumpCFFileHistogram(std::string* value) {
|
|||
value->append(oss.str());
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class SumPropertyAggregator : public IntPropertyAggregator {
|
||||
public:
|
||||
SumPropertyAggregator() : aggregated_value_(0) {}
|
||||
virtual ~SumPropertyAggregator() override = default;
|
||||
|
||||
void Add(ColumnFamilyData* cfd, uint64_t value) override {
|
||||
(void)cfd;
|
||||
aggregated_value_ += value;
|
||||
}
|
||||
|
||||
uint64_t Aggregate() const override { return aggregated_value_; }
|
||||
|
||||
private:
|
||||
uint64_t aggregated_value_;
|
||||
};
|
||||
|
||||
// A block cache may be shared by multiple column families.
|
||||
// BlockCachePropertyAggregator ensures that the same cache is only added once.
|
||||
class BlockCachePropertyAggregator : public IntPropertyAggregator {
|
||||
public:
|
||||
BlockCachePropertyAggregator() = default;
|
||||
virtual ~BlockCachePropertyAggregator() override = default;
|
||||
|
||||
void Add(ColumnFamilyData* cfd, uint64_t value) override {
|
||||
auto* table_factory = cfd->ioptions()->table_factory.get();
|
||||
assert(table_factory != nullptr);
|
||||
Cache* cache =
|
||||
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
|
||||
if (cache != nullptr) {
|
||||
block_cache_properties_.emplace(cache, value);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t Aggregate() const override {
|
||||
uint64_t sum = 0;
|
||||
for (const auto& p : block_cache_properties_) {
|
||||
sum += p.second;
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
private:
|
||||
std::unordered_map<Cache*, uint64_t> block_cache_properties_;
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
std::unique_ptr<IntPropertyAggregator> CreateIntPropertyAggregator(
|
||||
const Slice& property) {
|
||||
if (property == DB::Properties::kBlockCacheCapacity ||
|
||||
property == DB::Properties::kBlockCacheUsage ||
|
||||
property == DB::Properties::kBlockCachePinnedUsage) {
|
||||
return std::make_unique<BlockCachePropertyAggregator>();
|
||||
} else {
|
||||
return std::make_unique<SumPropertyAggregator>();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
|
@ -873,5 +873,24 @@ class InternalStats {
|
|||
uint64_t started_at_;
|
||||
};
|
||||
|
||||
// IntPropertyAggregator aggregates an integer property across all column
|
||||
// families.
|
||||
class IntPropertyAggregator {
|
||||
public:
|
||||
IntPropertyAggregator() {}
|
||||
virtual ~IntPropertyAggregator() {}
|
||||
|
||||
IntPropertyAggregator(const IntPropertyAggregator&) = delete;
|
||||
void operator=(const IntPropertyAggregator&) = delete;
|
||||
|
||||
// Add a column family's property value to the aggregator.
|
||||
virtual void Add(ColumnFamilyData* cfd, uint64_t value) = 0;
|
||||
|
||||
// Get the aggregated value.
|
||||
virtual uint64_t Aggregate() const = 0;
|
||||
};
|
||||
|
||||
std::unique_ptr<IntPropertyAggregator> CreateIntPropertyAggregator(
|
||||
const Slice& property);
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
Loading…
Reference in New Issue