From 6adc39e1bfd68310a8355014e600afde3f0da6ba Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Wed, 23 Jun 2021 10:24:39 -0700 Subject: [PATCH] Add an internal iterator that can measure the inflow of blobs (#8443) Summary: Follow-up to https://github.com/facebook/rocksdb/issues/8426 . The patch adds a new kind of `InternalIterator` that wraps another one and passes each key-value encountered to `BlobGarbageMeter` as inflow. This iterator will be used as an input iterator for compactions when the input SSTs reference blob files. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8443 Test Plan: `make check` Reviewed By: jay-zhuang Differential Revision: D29311987 Pulled By: ltamasi fbshipit-source-id: b4493b4c0c0c2e3c2ecc33c8969a5ef02de5d9d8 --- CMakeLists.txt | 1 + Makefile | 3 + TARGETS | 7 + db/blob/blob_counting_iterator.h | 142 +++++++++++ db/blob/blob_counting_iterator_test.cc | 325 +++++++++++++++++++++++++ src.mk | 1 + 6 files changed, 479 insertions(+) create mode 100644 db/blob/blob_counting_iterator.h create mode 100644 db/blob/blob_counting_iterator_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 763daabccd..3ced8eeef5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1124,6 +1124,7 @@ if(WITH_TESTS) list(APPEND TESTS cache/cache_test.cc cache/lru_cache_test.cc + db/blob/blob_counting_iterator_test.cc db/blob/blob_file_addition_test.cc db/blob/blob_file_builder_test.cc db/blob/blob_file_cache_test.cc diff --git a/Makefile b/Makefile index 3daa2c037b..736a628adb 100644 --- a/Makefile +++ b/Makefile @@ -1827,6 +1827,9 @@ block_cache_trace_analyzer_test: $(OBJ_DIR)/tools/block_cache_analyzer/block_cac defer_test: $(OBJ_DIR)/util/defer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +blob_counting_iterator_test: $(OBJ_DIR)/db/blob/blob_counting_iterator_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + blob_file_addition_test: $(OBJ_DIR)/db/blob/blob_file_addition_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index ac8f9509b7..c73bd84ab5 100644 --- a/TARGETS +++ b/TARGETS @@ -908,6 +908,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "blob_counting_iterator_test", + "db/blob/blob_counting_iterator_test.cc", + "parallel", + [], + [], + ], [ "blob_db_test", "utilities/blob_db/blob_db_test.cc", diff --git a/db/blob/blob_counting_iterator.h b/db/blob/blob_counting_iterator.h new file mode 100644 index 0000000000..b73f545eb3 --- /dev/null +++ b/db/blob/blob_counting_iterator.h @@ -0,0 +1,142 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "db/blob/blob_garbage_meter.h" +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "table/internal_iterator.h" + +namespace ROCKSDB_NAMESPACE { + +// An internal iterator that passes each key-value encountered to +// BlobGarbageMeter as inflow in order to measure the total number and size of +// blobs in the compaction input on a per-blob file basis. +class BlobCountingIterator : public InternalIterator { + public: + BlobCountingIterator(InternalIterator* iter, + BlobGarbageMeter* blob_garbage_meter) + : iter_(iter), blob_garbage_meter_(blob_garbage_meter) { + assert(iter_); + assert(blob_garbage_meter_); + + UpdateAndCountBlobIfNeeded(); + } + + bool Valid() const override { return iter_->Valid() && status_.ok(); } + + void SeekToFirst() override { + iter_->SeekToFirst(); + UpdateAndCountBlobIfNeeded(); + } + + void SeekToLast() override { + iter_->SeekToLast(); + UpdateAndCountBlobIfNeeded(); + } + + void Seek(const Slice& target) override { + iter_->Seek(target); + UpdateAndCountBlobIfNeeded(); + } + + void SeekForPrev(const Slice& target) override { + iter_->SeekForPrev(target); + UpdateAndCountBlobIfNeeded(); + } + + void Next() override { + assert(Valid()); + + iter_->Next(); + UpdateAndCountBlobIfNeeded(); + } + + bool NextAndGetResult(IterateResult* result) override { + assert(Valid()); + + const bool res = iter_->NextAndGetResult(result); + UpdateAndCountBlobIfNeeded(); + return res; + } + + void Prev() override { + assert(Valid()); + + iter_->Prev(); + UpdateAndCountBlobIfNeeded(); + } + + Slice key() const override { + assert(Valid()); + return iter_->key(); + } + + Slice user_key() const override { + assert(Valid()); + return iter_->user_key(); + } + + Slice value() const override { + assert(Valid()); + return iter_->value(); + } + + Status status() const override { return status_; } + + bool PrepareValue() override { + assert(Valid()); + return iter_->PrepareValue(); + } + + bool MayBeOutOfLowerBound() override { + assert(Valid()); + return iter_->MayBeOutOfLowerBound(); + } + + IterBoundCheck UpperBoundCheckResult() override { + assert(Valid()); + return iter_->UpperBoundCheckResult(); + } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + iter_->SetPinnedItersMgr(pinned_iters_mgr); + } + + bool IsKeyPinned() const override { + assert(Valid()); + return iter_->IsKeyPinned(); + } + + bool IsValuePinned() const override { + assert(Valid()); + return iter_->IsValuePinned(); + } + + Status GetProperty(std::string prop_name, std::string* prop) override { + return iter_->GetProperty(prop_name, prop); + } + + private: + void UpdateAndCountBlobIfNeeded() { + assert(!iter_->Valid() || iter_->status().ok()); + + if (!iter_->Valid()) { + status_ = iter_->status(); + return; + } + + status_ = blob_garbage_meter_->ProcessInFlow(key(), value()); + } + + InternalIterator* iter_; + BlobGarbageMeter* blob_garbage_meter_; + Status status_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_counting_iterator_test.cc b/db/blob/blob_counting_iterator_test.cc new file mode 100644 index 0000000000..12ccbc75aa --- /dev/null +++ b/db/blob/blob_counting_iterator_test.cc @@ -0,0 +1,325 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_counting_iterator.h" + +#include +#include + +#include "db/blob/blob_garbage_meter.h" +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/dbformat.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +void CheckInFlow(const BlobGarbageMeter& blob_garbage_meter, + uint64_t blob_file_number, uint64_t count, uint64_t bytes) { + const auto& flows = blob_garbage_meter.flows(); + + const auto it = flows.find(blob_file_number); + if (it == flows.end()) { + ASSERT_EQ(count, 0); + ASSERT_EQ(bytes, 0); + return; + } + + const auto& in = it->second.GetInFlow(); + + ASSERT_EQ(in.GetCount(), count); + ASSERT_EQ(in.GetBytes(), bytes); +} + +TEST(BlobCountingIteratorTest, CountBlobs) { + // Note: the input consists of three key-values: two are blob references to + // different blob files, while the third one is a plain value. + constexpr char user_key0[] = "key0"; + constexpr char user_key1[] = "key1"; + constexpr char user_key2[] = "key2"; + + const std::vector keys{ + test::KeyStr(user_key0, 1, kTypeBlobIndex), + test::KeyStr(user_key1, 2, kTypeBlobIndex), + test::KeyStr(user_key2, 3, kTypeValue)}; + + constexpr uint64_t first_blob_file_number = 4; + constexpr uint64_t first_offset = 1000; + constexpr uint64_t first_size = 2000; + + std::string first_blob_index; + BlobIndex::EncodeBlob(&first_blob_index, first_blob_file_number, first_offset, + first_size, kNoCompression); + + constexpr uint64_t second_blob_file_number = 6; + constexpr uint64_t second_offset = 2000; + constexpr uint64_t second_size = 4000; + + std::string second_blob_index; + BlobIndex::EncodeBlob(&second_blob_index, second_blob_file_number, + second_offset, second_size, kNoCompression); + + const std::vector values{first_blob_index, second_blob_index, + "raw_value"}; + + assert(keys.size() == values.size()); + + test::VectorIterator input(keys, values); + BlobGarbageMeter blob_garbage_meter; + + BlobCountingIterator blob_counter(&input, &blob_garbage_meter); + + constexpr uint64_t first_expected_bytes = + first_size + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(user_key0) - 1); + constexpr uint64_t second_expected_bytes = + second_size + + BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(user_key1) - 1); + + // Call SeekToFirst and iterate forward + blob_counter.SeekToFirst(); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[0]); + ASSERT_EQ(blob_counter.user_key(), user_key0); + ASSERT_EQ(blob_counter.value(), values[0]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 1, + first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 0, 0); + + blob_counter.Next(); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[1]); + ASSERT_EQ(blob_counter.user_key(), user_key1); + ASSERT_EQ(blob_counter.value(), values[1]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 1, + first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 1, + second_expected_bytes); + + blob_counter.Next(); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[2]); + ASSERT_EQ(blob_counter.user_key(), user_key2); + ASSERT_EQ(blob_counter.value(), values[2]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 1, + first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 1, + second_expected_bytes); + + blob_counter.Next(); + ASSERT_FALSE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 1, + first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 1, + second_expected_bytes); + + // Do it again using NextAndGetResult + blob_counter.SeekToFirst(); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[0]); + ASSERT_EQ(blob_counter.user_key(), user_key0); + ASSERT_EQ(blob_counter.value(), values[0]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 2, + 2 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 1, + second_expected_bytes); + + { + IterateResult result; + ASSERT_TRUE(blob_counter.NextAndGetResult(&result)); + ASSERT_EQ(result.key, keys[1]); + ASSERT_EQ(blob_counter.user_key(), user_key1); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[1]); + ASSERT_EQ(blob_counter.value(), values[1]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 2, + 2 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 2, + 2 * second_expected_bytes); + } + + { + IterateResult result; + ASSERT_TRUE(blob_counter.NextAndGetResult(&result)); + ASSERT_EQ(result.key, keys[2]); + ASSERT_EQ(blob_counter.user_key(), user_key2); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[2]); + ASSERT_EQ(blob_counter.value(), values[2]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 2, + 2 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 2, + 2 * second_expected_bytes); + } + + { + IterateResult result; + ASSERT_FALSE(blob_counter.NextAndGetResult(&result)); + ASSERT_FALSE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 2, + 2 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 2, + 2 * second_expected_bytes); + } + + // Call SeekToLast and iterate backward + blob_counter.SeekToLast(); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[2]); + ASSERT_EQ(blob_counter.user_key(), user_key2); + ASSERT_EQ(blob_counter.value(), values[2]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 2, + 2 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 2, + 2 * second_expected_bytes); + + blob_counter.Prev(); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[1]); + ASSERT_EQ(blob_counter.user_key(), user_key1); + ASSERT_EQ(blob_counter.value(), values[1]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 2, + 2 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 3, + 3 * second_expected_bytes); + + blob_counter.Prev(); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[0]); + ASSERT_EQ(blob_counter.user_key(), user_key0); + ASSERT_EQ(blob_counter.value(), values[0]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 3, + 3 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 3, + 3 * second_expected_bytes); + + blob_counter.Prev(); + ASSERT_FALSE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 3, + 3 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 3, + 3 * second_expected_bytes); + + // Call Seek for all keys (plus one that's greater than all of them) + blob_counter.Seek(keys[0]); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[0]); + ASSERT_EQ(blob_counter.user_key(), user_key0); + ASSERT_EQ(blob_counter.value(), values[0]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 4, + 4 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 3, + 3 * second_expected_bytes); + + blob_counter.Seek(keys[1]); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[1]); + ASSERT_EQ(blob_counter.user_key(), user_key1); + ASSERT_EQ(blob_counter.value(), values[1]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 4, + 4 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 4, + 4 * second_expected_bytes); + + blob_counter.Seek(keys[2]); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[2]); + ASSERT_EQ(blob_counter.user_key(), user_key2); + ASSERT_EQ(blob_counter.value(), values[2]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 4, + 4 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 4, + 4 * second_expected_bytes); + + blob_counter.Seek("zzz"); + ASSERT_FALSE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 4, + 4 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 4, + 4 * second_expected_bytes); + + // Call SeekForPrev for all keys (plus one that's less than all of them) + blob_counter.SeekForPrev("aaa"); + ASSERT_FALSE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 4, + 4 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 4, + 4 * second_expected_bytes); + + blob_counter.SeekForPrev(keys[0]); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[0]); + ASSERT_EQ(blob_counter.user_key(), user_key0); + ASSERT_EQ(blob_counter.value(), values[0]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 5, + 5 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 4, + 4 * second_expected_bytes); + + blob_counter.SeekForPrev(keys[1]); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[1]); + ASSERT_EQ(blob_counter.user_key(), user_key1); + ASSERT_EQ(blob_counter.value(), values[1]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 5, + 5 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 5, + 5 * second_expected_bytes); + + blob_counter.SeekForPrev(keys[2]); + ASSERT_TRUE(blob_counter.Valid()); + ASSERT_OK(blob_counter.status()); + ASSERT_EQ(blob_counter.key(), keys[2]); + ASSERT_EQ(blob_counter.user_key(), user_key2); + ASSERT_EQ(blob_counter.value(), values[2]); + CheckInFlow(blob_garbage_meter, first_blob_file_number, 5, + 5 * first_expected_bytes); + CheckInFlow(blob_garbage_meter, second_blob_file_number, 5, + 5 * second_expected_bytes); +} + +TEST(BlobCountingIteratorTest, CorruptBlobIndex) { + const std::vector keys{ + test::KeyStr("user_key", 1, kTypeBlobIndex)}; + const std::vector values{"i_am_not_a_blob_index"}; + + assert(keys.size() == values.size()); + + test::VectorIterator input(keys, values); + BlobGarbageMeter blob_garbage_meter; + + BlobCountingIterator blob_counter(&input, &blob_garbage_meter); + + blob_counter.SeekToFirst(); + ASSERT_FALSE(blob_counter.Valid()); + ASSERT_NOK(blob_counter.status()); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src.mk b/src.mk index 95ba2e70d4..fef66d0577 100644 --- a/src.mk +++ b/src.mk @@ -379,6 +379,7 @@ BENCH_MAIN_SOURCES = \ TEST_MAIN_SOURCES = \ cache/cache_test.cc \ cache/lru_cache_test.cc \ + db/blob/blob_counting_iterator_test.cc \ db/blob/blob_file_addition_test.cc \ db/blob/blob_file_builder_test.cc \ db/blob/blob_file_cache_test.cc \