mirror of https://github.com/facebook/rocksdb.git
AttributeGroups - GetEntity Implementation (#11943)
Summary: Implementation of `GetEntity()` API that returns wide-column entities as AttributeGroups from multiple column families for a single key. Regarding the definition of Attribute groups, please see the detailed example description in PR https://github.com/facebook/rocksdb/issues/11925 Pull Request resolved: https://github.com/facebook/rocksdb/pull/11943 Test Plan: - `DBWideBasicTest::GetEntityAsPinnableAttributeGroups` added will enable the new API in the `db_stress` after merging Reviewed By: ltamasi Differential Revision: D50195794 Pulled By: jaykorean fbshipit-source-id: 218d54841ac7e337de62e13b1233b0a99bd91af3
This commit is contained in:
parent
2dab137182
commit
0ecfc4fbb4
|
@ -2057,6 +2057,53 @@ Status DBImpl::GetEntity(const ReadOptions& _read_options,
|
|||
return GetImpl(read_options, key, get_impl_options);
|
||||
}
|
||||
|
||||
Status DBImpl::GetEntity(const ReadOptions& _read_options, const Slice& key,
|
||||
PinnableAttributeGroups* result) {
|
||||
if (!result) {
|
||||
return Status::InvalidArgument(
|
||||
"Cannot call GetEntity without PinnableAttributeGroups object");
|
||||
}
|
||||
const size_t num_column_families = result->size();
|
||||
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
||||
_read_options.io_activity != Env::IOActivity::kGetEntity) {
|
||||
Status s = Status::InvalidArgument(
|
||||
"Cannot call GetEntity with `ReadOptions::io_activity` != "
|
||||
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGetEntity`");
|
||||
for (size_t i = 0; i < num_column_families; ++i) {
|
||||
(*result)[i].SetStatus(s);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
// return early if no CF was passed in
|
||||
if (num_column_families == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
ReadOptions read_options(_read_options);
|
||||
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
||||
read_options.io_activity = Env::IOActivity::kGetEntity;
|
||||
}
|
||||
std::vector<Slice> keys;
|
||||
std::vector<ColumnFamilyHandle*> column_families;
|
||||
for (size_t i = 0; i < num_column_families; ++i) {
|
||||
// Adding the same key slice for different CFs
|
||||
keys.emplace_back(key);
|
||||
column_families.emplace_back((*result)[i].column_family());
|
||||
}
|
||||
std::vector<PinnableWideColumns> columns(num_column_families);
|
||||
std::vector<Status> statuses(num_column_families);
|
||||
MultiGetCommon(
|
||||
read_options, num_column_families, column_families.data(), keys.data(),
|
||||
/* values */ nullptr, columns.data(),
|
||||
/* timestamps */ nullptr, statuses.data(), /* sorted_input */ false);
|
||||
// Set results
|
||||
for (size_t i = 0; i < num_column_families; ++i) {
|
||||
(*result)[i].Reset();
|
||||
(*result)[i].SetStatus(statuses[i]);
|
||||
(*result)[i].SetColumns(std::move(columns[i]));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) {
|
||||
// If both thresholds are reached, a function returning merge operands as
|
||||
// `PinnableSlice`s should reference the `SuperVersion` to avoid large and/or
|
||||
|
|
|
@ -242,6 +242,8 @@ class DBImpl : public DB {
|
|||
Status GetEntity(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
PinnableWideColumns* columns) override;
|
||||
Status GetEntity(const ReadOptions& options, const Slice& key,
|
||||
PinnableAttributeGroups* result) override;
|
||||
|
||||
using DB::GetMergeOperands;
|
||||
Status GetMergeOperands(const ReadOptions& options,
|
||||
|
|
|
@ -236,6 +236,121 @@ TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
|
|||
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
||||
}
|
||||
|
||||
TEST_F(DBWideBasicTest, GetEntityAsPinnableAttributeGroups) {
|
||||
Options options = GetDefaultOptions();
|
||||
CreateAndReopenWithCF({"hot_cf", "cold_cf"}, options);
|
||||
|
||||
constexpr int kDefaultCfHandleIndex = 0;
|
||||
constexpr int kHotCfHandleIndex = 1;
|
||||
constexpr int kColdCfHandleIndex = 2;
|
||||
|
||||
constexpr char first_key[] = "first";
|
||||
WideColumns first_default_columns{
|
||||
{"default_cf_col_1_name", "first_key_default_cf_col_1_value"},
|
||||
{"default_cf_col_2_name", "first_key_default_cf_col_2_value"}};
|
||||
WideColumns first_hot_columns{
|
||||
{"hot_cf_col_1_name", "first_key_hot_cf_col_1_value"},
|
||||
{"hot_cf_col_2_name", "first_key_hot_cf_col_2_value"}};
|
||||
WideColumns first_cold_columns{
|
||||
{"cold_cf_col_1_name", "first_key_cold_cf_col_1_value"}};
|
||||
|
||||
constexpr char second_key[] = "second";
|
||||
WideColumns second_hot_columns{
|
||||
{"hot_cf_col_1_name", "second_key_hot_cf_col_1_value"}};
|
||||
WideColumns second_cold_columns{
|
||||
{"cold_cf_col_1_name", "second_key_cold_cf_col_1_value"}};
|
||||
|
||||
// TODO - update this to use the multi-attribute-group PutEntity when ready
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kDefaultCfHandleIndex],
|
||||
first_key, first_default_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
first_key, first_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
first_key, first_cold_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
second_key, second_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
second_key, second_cold_columns));
|
||||
|
||||
std::vector<ColumnFamilyHandle*> all_cfs = handles_;
|
||||
std::vector<ColumnFamilyHandle*> default_and_hot_cfs{
|
||||
{handles_[kDefaultCfHandleIndex], handles_[kHotCfHandleIndex]}};
|
||||
std::vector<ColumnFamilyHandle*> hot_and_cold_cfs{
|
||||
{handles_[kHotCfHandleIndex], handles_[kColdCfHandleIndex]}};
|
||||
auto create_result =
|
||||
[](const std::vector<ColumnFamilyHandle*>& column_families)
|
||||
-> PinnableAttributeGroups {
|
||||
PinnableAttributeGroups result;
|
||||
for (size_t i = 0; i < column_families.size(); ++i) {
|
||||
result.emplace_back(column_families[i]);
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
{
|
||||
// Case 1. Get first key from default cf and hot_cf and second key from
|
||||
// hot_cf and cold_cf
|
||||
constexpr size_t num_column_families = 2;
|
||||
PinnableAttributeGroups first_key_result =
|
||||
create_result(default_and_hot_cfs);
|
||||
PinnableAttributeGroups second_key_result = create_result(hot_and_cold_cfs);
|
||||
|
||||
// GetEntity for first_key
|
||||
ASSERT_OK(db_->GetEntity(ReadOptions(), first_key, &first_key_result));
|
||||
ASSERT_EQ(num_column_families, first_key_result.size());
|
||||
// We expect to get values for all keys and CFs
|
||||
for (size_t i = 0; i < num_column_families; ++i) {
|
||||
ASSERT_OK(first_key_result[i].status());
|
||||
}
|
||||
// verify values for first key (default cf and hot cf)
|
||||
ASSERT_EQ(first_default_columns, first_key_result[0].columns());
|
||||
ASSERT_EQ(first_hot_columns, first_key_result[1].columns());
|
||||
|
||||
// GetEntity for second_key
|
||||
ASSERT_OK(db_->GetEntity(ReadOptions(), second_key, &second_key_result));
|
||||
ASSERT_EQ(num_column_families, second_key_result.size());
|
||||
// We expect to get values for all keys and CFs
|
||||
for (size_t i = 0; i < num_column_families; ++i) {
|
||||
ASSERT_OK(second_key_result[i].status());
|
||||
}
|
||||
// verify values for second key (hot cf and cold cf)
|
||||
ASSERT_EQ(second_hot_columns, second_key_result[0].columns());
|
||||
ASSERT_EQ(second_cold_columns, second_key_result[1].columns());
|
||||
}
|
||||
{
|
||||
// Case 2. Get first key and second key from all cfs. For the second key, we
|
||||
// don't expect to get columns from default cf.
|
||||
constexpr size_t num_column_families = 3;
|
||||
PinnableAttributeGroups first_key_result = create_result(all_cfs);
|
||||
PinnableAttributeGroups second_key_result = create_result(all_cfs);
|
||||
|
||||
// GetEntity for first_key
|
||||
ASSERT_OK(db_->GetEntity(ReadOptions(), first_key, &first_key_result));
|
||||
ASSERT_EQ(num_column_families, first_key_result.size());
|
||||
// We expect to get values for all keys and CFs
|
||||
for (size_t i = 0; i < num_column_families; ++i) {
|
||||
ASSERT_OK(first_key_result[i].status());
|
||||
}
|
||||
// verify values for first key
|
||||
ASSERT_EQ(first_default_columns, first_key_result[0].columns());
|
||||
ASSERT_EQ(first_hot_columns, first_key_result[1].columns());
|
||||
ASSERT_EQ(first_cold_columns, first_key_result[2].columns());
|
||||
|
||||
// GetEntity for second_key
|
||||
ASSERT_OK(db_->GetEntity(ReadOptions(), second_key, &second_key_result));
|
||||
ASSERT_EQ(num_column_families, second_key_result.size());
|
||||
// key does not exist in default cf
|
||||
ASSERT_NOK(second_key_result[0].status());
|
||||
ASSERT_TRUE(second_key_result[0].status().IsNotFound());
|
||||
|
||||
// verify values for second key (hot cf and cold cf)
|
||||
ASSERT_OK(second_key_result[1].status());
|
||||
ASSERT_OK(second_key_result[2].status());
|
||||
ASSERT_EQ(second_hot_columns, second_key_result[1].columns());
|
||||
ASSERT_EQ(second_cold_columns, second_key_result[2].columns());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBWideBasicTest, MultiCFMultiGetEntity) {
|
||||
Options options = GetDefaultOptions();
|
||||
CreateAndReopenWithCF({"corinthian"}, options);
|
||||
|
@ -274,9 +389,9 @@ TEST_F(DBWideBasicTest, MultiCFMultiGetEntityAsPinnableAttributeGroups) {
|
|||
Options options = GetDefaultOptions();
|
||||
CreateAndReopenWithCF({"hot_cf", "cold_cf"}, options);
|
||||
|
||||
constexpr int DEFAULT_CF_HANDLE_INDEX = 0;
|
||||
constexpr int HOT_CF_HANDLE_INDEX = 1;
|
||||
constexpr int COLD_CF_HANDLE_INDEX = 2;
|
||||
constexpr int kDefaultCfHandleIndex = 0;
|
||||
constexpr int kHotCfHandleIndex = 1;
|
||||
constexpr int kColdCfHandleIndex = 2;
|
||||
|
||||
constexpr char first_key[] = "first";
|
||||
WideColumns first_default_columns{
|
||||
|
@ -294,24 +409,24 @@ TEST_F(DBWideBasicTest, MultiCFMultiGetEntityAsPinnableAttributeGroups) {
|
|||
{"cold_cf_col_1_name", "second_key_cold_cf_col_1_value"}};
|
||||
|
||||
// TODO - update this to use the multi-attribute-group PutEntity when ready
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[DEFAULT_CF_HANDLE_INDEX],
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kDefaultCfHandleIndex],
|
||||
first_key, first_default_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[HOT_CF_HANDLE_INDEX],
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
first_key, first_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[COLD_CF_HANDLE_INDEX],
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
first_key, first_cold_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[HOT_CF_HANDLE_INDEX],
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
second_key, second_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[COLD_CF_HANDLE_INDEX],
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
second_key, second_cold_columns));
|
||||
|
||||
constexpr size_t num_keys = 2;
|
||||
std::array<Slice, num_keys> keys = {first_key, second_key};
|
||||
std::vector<ColumnFamilyHandle*> all_cfs = handles_;
|
||||
std::vector<ColumnFamilyHandle*> default_and_hot_cfs{
|
||||
{handles_[DEFAULT_CF_HANDLE_INDEX], handles_[HOT_CF_HANDLE_INDEX]}};
|
||||
{handles_[kDefaultCfHandleIndex], handles_[kHotCfHandleIndex]}};
|
||||
std::vector<ColumnFamilyHandle*> hot_and_cold_cfs{
|
||||
{handles_[HOT_CF_HANDLE_INDEX], handles_[COLD_CF_HANDLE_INDEX]}};
|
||||
{handles_[kHotCfHandleIndex], handles_[kColdCfHandleIndex]}};
|
||||
auto create_result =
|
||||
[](const std::vector<ColumnFamilyHandle*>& column_families)
|
||||
-> PinnableAttributeGroups {
|
||||
|
|
|
@ -602,6 +602,16 @@ class DB {
|
|||
return Status::NotSupported("GetEntity not supported");
|
||||
}
|
||||
|
||||
// Returns logically grouped wide-column entities per column family (a.k.a.
|
||||
// attribute groups) for a single key. PinnableAttributeGroups is a vector of
|
||||
// PinnableAttributeGroup. Each PinnableAttributeGroup will have
|
||||
// ColumnFamilyHandle* as input, and Status and PinnableWideColumns as output.
|
||||
virtual Status GetEntity(const ReadOptions& /* options */,
|
||||
const Slice& /* key */,
|
||||
PinnableAttributeGroups* /* result */) {
|
||||
return Status::NotSupported("GetEntity not supported");
|
||||
}
|
||||
|
||||
// Populates the `merge_operands` array with all the merge operands in the DB
|
||||
// for `key`. The `merge_operands` array will be populated in the order of
|
||||
// insertion. The number of entries populated in `merge_operands` will be
|
||||
|
|
Loading…
Reference in New Issue