From 0ecfc4fbb45c14cfc34054d2f7f6fdfd910b46f9 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Mon, 6 Nov 2023 15:04:41 -0800 Subject: [PATCH] AttributeGroups - GetEntity Implementation (#11943) Summary: Implementation of `GetEntity()` API that returns wide-column entities as AttributeGroups from multiple column families for a single key. Regarding the definition of Attribute groups, please see the detailed example description in PR https://github.com/facebook/rocksdb/issues/11925 Pull Request resolved: https://github.com/facebook/rocksdb/pull/11943 Test Plan: - `DBWideBasicTest::GetEntityAsPinnableAttributeGroups` added will enable the new API in the `db_stress` after merging Reviewed By: ltamasi Differential Revision: D50195794 Pulled By: jaykorean fbshipit-source-id: 218d54841ac7e337de62e13b1233b0a99bd91af3 --- db/db_impl/db_impl.cc | 47 ++++++++++++ db/db_impl/db_impl.h | 2 + db/wide/db_wide_basic_test.cc | 135 +++++++++++++++++++++++++++++++--- include/rocksdb/db.h | 10 +++ 4 files changed, 184 insertions(+), 10 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 7416369570..a36eda16f6 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2057,6 +2057,53 @@ Status DBImpl::GetEntity(const ReadOptions& _read_options, return GetImpl(read_options, key, get_impl_options); } +Status DBImpl::GetEntity(const ReadOptions& _read_options, const Slice& key, + PinnableAttributeGroups* result) { + if (!result) { + return Status::InvalidArgument( + "Cannot call GetEntity without PinnableAttributeGroups object"); + } + const size_t num_column_families = result->size(); + if (_read_options.io_activity != Env::IOActivity::kUnknown && + _read_options.io_activity != Env::IOActivity::kGetEntity) { + Status s = Status::InvalidArgument( + "Cannot call GetEntity with `ReadOptions::io_activity` != " + "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGetEntity`"); + for (size_t i = 0; i < num_column_families; ++i) { + (*result)[i].SetStatus(s); + } + return s; + } + // return early if no CF was passed in + if (num_column_families == 0) { + return Status::OK(); + } + ReadOptions read_options(_read_options); + if (read_options.io_activity == Env::IOActivity::kUnknown) { + read_options.io_activity = Env::IOActivity::kGetEntity; + } + std::vector keys; + std::vector column_families; + for (size_t i = 0; i < num_column_families; ++i) { + // Adding the same key slice for different CFs + keys.emplace_back(key); + column_families.emplace_back((*result)[i].column_family()); + } + std::vector columns(num_column_families); + std::vector statuses(num_column_families); + MultiGetCommon( + read_options, num_column_families, column_families.data(), keys.data(), + /* values */ nullptr, columns.data(), + /* timestamps */ nullptr, statuses.data(), /* sorted_input */ false); + // Set results + for (size_t i = 0; i < num_column_families; ++i) { + (*result)[i].Reset(); + (*result)[i].SetStatus(statuses[i]); + (*result)[i].SetColumns(std::move(columns[i])); + } + return Status::OK(); +} + bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) { // If both thresholds are reached, a function returning merge operands as // `PinnableSlice`s should reference the `SuperVersion` to avoid large and/or diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6769d7f501..4ac6a2d14c 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -242,6 +242,8 @@ class DBImpl : public DB { Status GetEntity(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableWideColumns* columns) override; + Status GetEntity(const ReadOptions& options, const Slice& key, + PinnableAttributeGroups* result) override; using DB::GetMergeOperands; Status GetMergeOperands(const ReadOptions& options, diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc index 03edf26b6a..821e9b54a7 100644 --- a/db/wide/db_wide_basic_test.cc +++ b/db/wide/db_wide_basic_test.cc @@ -236,6 +236,121 @@ TEST_F(DBWideBasicTest, PutEntityColumnFamily) { ASSERT_OK(db_->Write(WriteOptions(), &batch)); } +TEST_F(DBWideBasicTest, GetEntityAsPinnableAttributeGroups) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"hot_cf", "cold_cf"}, options); + + constexpr int kDefaultCfHandleIndex = 0; + constexpr int kHotCfHandleIndex = 1; + constexpr int kColdCfHandleIndex = 2; + + constexpr char first_key[] = "first"; + WideColumns first_default_columns{ + {"default_cf_col_1_name", "first_key_default_cf_col_1_value"}, + {"default_cf_col_2_name", "first_key_default_cf_col_2_value"}}; + WideColumns first_hot_columns{ + {"hot_cf_col_1_name", "first_key_hot_cf_col_1_value"}, + {"hot_cf_col_2_name", "first_key_hot_cf_col_2_value"}}; + WideColumns first_cold_columns{ + {"cold_cf_col_1_name", "first_key_cold_cf_col_1_value"}}; + + constexpr char second_key[] = "second"; + WideColumns second_hot_columns{ + {"hot_cf_col_1_name", "second_key_hot_cf_col_1_value"}}; + WideColumns second_cold_columns{ + {"cold_cf_col_1_name", "second_key_cold_cf_col_1_value"}}; + + // TODO - update this to use the multi-attribute-group PutEntity when ready + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kDefaultCfHandleIndex], + first_key, first_default_columns)); + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex], + first_key, first_hot_columns)); + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex], + first_key, first_cold_columns)); + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex], + second_key, second_hot_columns)); + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex], + second_key, second_cold_columns)); + + std::vector all_cfs = handles_; + std::vector default_and_hot_cfs{ + {handles_[kDefaultCfHandleIndex], handles_[kHotCfHandleIndex]}}; + std::vector hot_and_cold_cfs{ + {handles_[kHotCfHandleIndex], handles_[kColdCfHandleIndex]}}; + auto create_result = + [](const std::vector& column_families) + -> PinnableAttributeGroups { + PinnableAttributeGroups result; + for (size_t i = 0; i < column_families.size(); ++i) { + result.emplace_back(column_families[i]); + } + return result; + }; + + { + // Case 1. Get first key from default cf and hot_cf and second key from + // hot_cf and cold_cf + constexpr size_t num_column_families = 2; + PinnableAttributeGroups first_key_result = + create_result(default_and_hot_cfs); + PinnableAttributeGroups second_key_result = create_result(hot_and_cold_cfs); + + // GetEntity for first_key + ASSERT_OK(db_->GetEntity(ReadOptions(), first_key, &first_key_result)); + ASSERT_EQ(num_column_families, first_key_result.size()); + // We expect to get values for all keys and CFs + for (size_t i = 0; i < num_column_families; ++i) { + ASSERT_OK(first_key_result[i].status()); + } + // verify values for first key (default cf and hot cf) + ASSERT_EQ(first_default_columns, first_key_result[0].columns()); + ASSERT_EQ(first_hot_columns, first_key_result[1].columns()); + + // GetEntity for second_key + ASSERT_OK(db_->GetEntity(ReadOptions(), second_key, &second_key_result)); + ASSERT_EQ(num_column_families, second_key_result.size()); + // We expect to get values for all keys and CFs + for (size_t i = 0; i < num_column_families; ++i) { + ASSERT_OK(second_key_result[i].status()); + } + // verify values for second key (hot cf and cold cf) + ASSERT_EQ(second_hot_columns, second_key_result[0].columns()); + ASSERT_EQ(second_cold_columns, second_key_result[1].columns()); + } + { + // Case 2. Get first key and second key from all cfs. For the second key, we + // don't expect to get columns from default cf. + constexpr size_t num_column_families = 3; + PinnableAttributeGroups first_key_result = create_result(all_cfs); + PinnableAttributeGroups second_key_result = create_result(all_cfs); + + // GetEntity for first_key + ASSERT_OK(db_->GetEntity(ReadOptions(), first_key, &first_key_result)); + ASSERT_EQ(num_column_families, first_key_result.size()); + // We expect to get values for all keys and CFs + for (size_t i = 0; i < num_column_families; ++i) { + ASSERT_OK(first_key_result[i].status()); + } + // verify values for first key + ASSERT_EQ(first_default_columns, first_key_result[0].columns()); + ASSERT_EQ(first_hot_columns, first_key_result[1].columns()); + ASSERT_EQ(first_cold_columns, first_key_result[2].columns()); + + // GetEntity for second_key + ASSERT_OK(db_->GetEntity(ReadOptions(), second_key, &second_key_result)); + ASSERT_EQ(num_column_families, second_key_result.size()); + // key does not exist in default cf + ASSERT_NOK(second_key_result[0].status()); + ASSERT_TRUE(second_key_result[0].status().IsNotFound()); + + // verify values for second key (hot cf and cold cf) + ASSERT_OK(second_key_result[1].status()); + ASSERT_OK(second_key_result[2].status()); + ASSERT_EQ(second_hot_columns, second_key_result[1].columns()); + ASSERT_EQ(second_cold_columns, second_key_result[2].columns()); + } +} + TEST_F(DBWideBasicTest, MultiCFMultiGetEntity) { Options options = GetDefaultOptions(); CreateAndReopenWithCF({"corinthian"}, options); @@ -274,9 +389,9 @@ TEST_F(DBWideBasicTest, MultiCFMultiGetEntityAsPinnableAttributeGroups) { Options options = GetDefaultOptions(); CreateAndReopenWithCF({"hot_cf", "cold_cf"}, options); - constexpr int DEFAULT_CF_HANDLE_INDEX = 0; - constexpr int HOT_CF_HANDLE_INDEX = 1; - constexpr int COLD_CF_HANDLE_INDEX = 2; + constexpr int kDefaultCfHandleIndex = 0; + constexpr int kHotCfHandleIndex = 1; + constexpr int kColdCfHandleIndex = 2; constexpr char first_key[] = "first"; WideColumns first_default_columns{ @@ -294,24 +409,24 @@ TEST_F(DBWideBasicTest, MultiCFMultiGetEntityAsPinnableAttributeGroups) { {"cold_cf_col_1_name", "second_key_cold_cf_col_1_value"}}; // TODO - update this to use the multi-attribute-group PutEntity when ready - ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[DEFAULT_CF_HANDLE_INDEX], + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kDefaultCfHandleIndex], first_key, first_default_columns)); - ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[HOT_CF_HANDLE_INDEX], + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex], first_key, first_hot_columns)); - ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[COLD_CF_HANDLE_INDEX], + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex], first_key, first_cold_columns)); - ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[HOT_CF_HANDLE_INDEX], + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex], second_key, second_hot_columns)); - ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[COLD_CF_HANDLE_INDEX], + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex], second_key, second_cold_columns)); constexpr size_t num_keys = 2; std::array keys = {first_key, second_key}; std::vector all_cfs = handles_; std::vector default_and_hot_cfs{ - {handles_[DEFAULT_CF_HANDLE_INDEX], handles_[HOT_CF_HANDLE_INDEX]}}; + {handles_[kDefaultCfHandleIndex], handles_[kHotCfHandleIndex]}}; std::vector hot_and_cold_cfs{ - {handles_[HOT_CF_HANDLE_INDEX], handles_[COLD_CF_HANDLE_INDEX]}}; + {handles_[kHotCfHandleIndex], handles_[kColdCfHandleIndex]}}; auto create_result = [](const std::vector& column_families) -> PinnableAttributeGroups { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a536a76c18..fae1729b12 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -602,6 +602,16 @@ class DB { return Status::NotSupported("GetEntity not supported"); } + // Returns logically grouped wide-column entities per column family (a.k.a. + // attribute groups) for a single key. PinnableAttributeGroups is a vector of + // PinnableAttributeGroup. Each PinnableAttributeGroup will have + // ColumnFamilyHandle* as input, and Status and PinnableWideColumns as output. + virtual Status GetEntity(const ReadOptions& /* options */, + const Slice& /* key */, + PinnableAttributeGroups* /* result */) { + return Status::NotSupported("GetEntity not supported"); + } + // Populates the `merge_operands` array with all the merge operands in the DB // for `key`. The `merge_operands` array will be populated in the order of // insertion. The number of entries populated in `merge_operands` will be