mirror of https://github.com/facebook/rocksdb.git
AttributeGroups - PutEntity Implementation (#11977)
Summary: Write Path for AttributeGroup Support. The new `PutEntity()` API uses `WriteBatch` and atomically writes WideColumns entities in multiple Column Families. Combined the release note from PR https://github.com/facebook/rocksdb/issues/11925 Pull Request resolved: https://github.com/facebook/rocksdb/pull/11977 Test Plan: - `DBWideBasicTest::MultiCFMultiGetEntityAsPinnableAttributeGroups` updated - `WriteBatchTest::AttributeGroupTest` added - `WriteBatchTest::AttributeGroupSavePointTest` added Reviewed By: ltamasi Differential Revision: D50457122 Pulled By: jaykorean fbshipit-source-id: 4997b265e415588ce077933082dcd1ac3eeae2cd
This commit is contained in:
parent
92dc5f3e67
commit
2adef5367a
|
@ -197,6 +197,8 @@ class DBImpl : public DB {
|
|||
Status PutEntity(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const WideColumns& columns) override;
|
||||
Status PutEntity(const WriteOptions& options, const Slice& key,
|
||||
const AttributeGroups& attribute_groups) override;
|
||||
|
||||
using DB::Merge;
|
||||
Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
|
||||
|
|
|
@ -53,6 +53,10 @@ class DBImplReadOnly : public DBImpl {
|
|||
const WideColumns& /* columns */) override {
|
||||
return Status::NotSupported("Not supported operation in read only mode.");
|
||||
}
|
||||
Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */,
|
||||
const AttributeGroups& /* attribute_groups */) override {
|
||||
return Status::NotSupported("Not supported operation in read only mode.");
|
||||
}
|
||||
|
||||
using DBImpl::Merge;
|
||||
virtual Status Merge(const WriteOptions& /*options*/,
|
||||
|
|
|
@ -136,6 +136,10 @@ class DBImplSecondary : public DBImpl {
|
|||
const WideColumns& /* columns */) override {
|
||||
return Status::NotSupported("Not supported operation in secondary mode.");
|
||||
}
|
||||
Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */,
|
||||
const AttributeGroups& /* attribute_groups */) override {
|
||||
return Status::NotSupported("Not supported operation in secondary mode.");
|
||||
}
|
||||
|
||||
using DBImpl::Merge;
|
||||
Status Merge(const WriteOptions& /*options*/,
|
||||
|
|
|
@ -48,6 +48,17 @@ Status DBImpl::PutEntity(const WriteOptions& options,
|
|||
return DB::PutEntity(options, column_family, key, columns);
|
||||
}
|
||||
|
||||
Status DBImpl::PutEntity(const WriteOptions& options, const Slice& key,
|
||||
const AttributeGroups& attribute_groups) {
|
||||
for (const AttributeGroup& ag : attribute_groups) {
|
||||
const Status s = FailIfCfHasTs(ag.column_family());
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return DB::PutEntity(options, key, attribute_groups);
|
||||
}
|
||||
|
||||
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& val) {
|
||||
const Status s = FailIfCfHasTs(column_family);
|
||||
|
@ -2385,6 +2396,22 @@ Status DB::PutEntity(const WriteOptions& options,
|
|||
return Write(options, &batch);
|
||||
}
|
||||
|
||||
Status DB::PutEntity(const WriteOptions& options, const Slice& key,
|
||||
const AttributeGroups& attribute_groups) {
|
||||
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
|
||||
assert(default_cf);
|
||||
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
|
||||
assert(default_cf_ucmp);
|
||||
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
|
||||
options.protection_bytes_per_key,
|
||||
default_cf_ucmp->timestamp_size());
|
||||
const Status s = batch.PutEntity(key, attribute_groups);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
return Write(options, &batch);
|
||||
}
|
||||
|
||||
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
|
||||
const Slice& key) {
|
||||
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
|
||||
|
|
|
@ -260,17 +260,17 @@ TEST_F(DBWideBasicTest, GetEntityAsPinnableAttributeGroups) {
|
|||
WideColumns second_cold_columns{
|
||||
{"cold_cf_col_1_name", "second_key_cold_cf_col_1_value"}};
|
||||
|
||||
// TODO - update this to use the multi-attribute-group PutEntity when ready
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kDefaultCfHandleIndex],
|
||||
first_key, first_default_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
first_key, first_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
first_key, first_cold_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
second_key, second_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
second_key, second_cold_columns));
|
||||
AttributeGroups first_key_attribute_groups{
|
||||
AttributeGroup(handles_[kDefaultCfHandleIndex], first_default_columns),
|
||||
AttributeGroup(handles_[kHotCfHandleIndex], first_hot_columns),
|
||||
AttributeGroup(handles_[kColdCfHandleIndex], first_cold_columns)};
|
||||
AttributeGroups second_key_attribute_groups{
|
||||
AttributeGroup(handles_[kHotCfHandleIndex], second_hot_columns),
|
||||
AttributeGroup(handles_[kColdCfHandleIndex], second_cold_columns)};
|
||||
ASSERT_OK(
|
||||
db_->PutEntity(WriteOptions(), first_key, first_key_attribute_groups));
|
||||
ASSERT_OK(
|
||||
db_->PutEntity(WriteOptions(), second_key, second_key_attribute_groups));
|
||||
|
||||
std::vector<ColumnFamilyHandle*> all_cfs = handles_;
|
||||
std::vector<ColumnFamilyHandle*> default_and_hot_cfs{
|
||||
|
@ -408,17 +408,18 @@ TEST_F(DBWideBasicTest, MultiCFMultiGetEntityAsPinnableAttributeGroups) {
|
|||
WideColumns second_cold_columns{
|
||||
{"cold_cf_col_1_name", "second_key_cold_cf_col_1_value"}};
|
||||
|
||||
// TODO - update this to use the multi-attribute-group PutEntity when ready
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kDefaultCfHandleIndex],
|
||||
first_key, first_default_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
first_key, first_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
first_key, first_cold_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kHotCfHandleIndex],
|
||||
second_key, second_hot_columns));
|
||||
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[kColdCfHandleIndex],
|
||||
second_key, second_cold_columns));
|
||||
AttributeGroups first_key_attribute_groups{
|
||||
AttributeGroup(handles_[kDefaultCfHandleIndex], first_default_columns),
|
||||
AttributeGroup(handles_[kHotCfHandleIndex], first_hot_columns),
|
||||
AttributeGroup(handles_[kColdCfHandleIndex], first_cold_columns)};
|
||||
AttributeGroups second_key_attribute_groups{
|
||||
AttributeGroup(handles_[kHotCfHandleIndex], second_hot_columns),
|
||||
AttributeGroup(handles_[kColdCfHandleIndex], second_cold_columns)};
|
||||
|
||||
ASSERT_OK(
|
||||
db_->PutEntity(WriteOptions(), first_key, first_key_attribute_groups));
|
||||
ASSERT_OK(
|
||||
db_->PutEntity(WriteOptions(), second_key, second_key_attribute_groups));
|
||||
|
||||
constexpr size_t num_keys = 2;
|
||||
std::array<Slice, num_keys> keys = {first_key, second_key};
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
#include "rocksdb/write_batch.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <stack>
|
||||
|
@ -1016,6 +1017,22 @@ Status WriteBatch::PutEntity(ColumnFamilyHandle* column_family,
|
|||
return WriteBatchInternal::PutEntity(this, cf_id, key, columns);
|
||||
}
|
||||
|
||||
Status WriteBatch::PutEntity(const Slice& key,
|
||||
const AttributeGroups& attribute_groups) {
|
||||
if (attribute_groups.empty()) {
|
||||
return Status::InvalidArgument(
|
||||
"Cannot call this method with empty attribute groups");
|
||||
}
|
||||
Status s;
|
||||
for (const AttributeGroup& ag : attribute_groups) {
|
||||
s = PutEntity(ag.column_family(), key, ag.columns());
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
|
||||
b->rep_.push_back(static_cast<char>(kTypeNoop));
|
||||
return Status::OK();
|
||||
|
|
|
@ -12,7 +12,9 @@
|
|||
#include "db/column_family.h"
|
||||
#include "db/db_test_util.h"
|
||||
#include "db/memtable.h"
|
||||
#include "db/wide/wide_columns_helper.h"
|
||||
#include "db/write_batch_internal.h"
|
||||
#include "dbformat.h"
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
|
@ -276,6 +278,21 @@ struct TestHandler : public WriteBatch::Handler {
|
|||
}
|
||||
return Status::OK();
|
||||
}
|
||||
Status PutEntityCF(uint32_t column_family_id, const Slice& key,
|
||||
const Slice& entity) override {
|
||||
std::ostringstream oss;
|
||||
Status s = WideColumnsHelper::DumpSliceAsWideColumns(entity, oss, false);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
if (column_family_id == 0) {
|
||||
seen += "PutEntity(" + key.ToString() + ", " + oss.str() + ")";
|
||||
} else {
|
||||
seen += "PutEntityCF(" + std::to_string(column_family_id) + ", " +
|
||||
key.ToString() + ", " + oss.str() + ")";
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
|
||||
if (column_family_id == 0) {
|
||||
seen += "Delete(" + key.ToString() + ")";
|
||||
|
@ -665,6 +682,82 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
|
|||
};
|
||||
} // anonymous namespace
|
||||
|
||||
TEST_F(WriteBatchTest, AttributeGroupTest) {
|
||||
WriteBatch batch;
|
||||
ColumnFamilyHandleImplDummy zero(0), two(2);
|
||||
AttributeGroups foo_ags;
|
||||
WideColumn zero_col_1{"0_c_1_n", "0_c_1_v"};
|
||||
WideColumn zero_col_2{"0_c_2_n", "0_c_2_v"};
|
||||
WideColumns zero_col_1_col_2{zero_col_1, zero_col_2};
|
||||
|
||||
WideColumn two_col_1{"2_c_1_n", "2_c_1_v"};
|
||||
WideColumn two_col_2{"2_c_2_n", "2_c_2_v"};
|
||||
WideColumns two_col_1_col_2{two_col_1, two_col_2};
|
||||
|
||||
foo_ags.emplace_back(&zero, zero_col_1_col_2);
|
||||
foo_ags.emplace_back(&two, two_col_1_col_2);
|
||||
|
||||
ASSERT_OK(batch.PutEntity("foo", foo_ags));
|
||||
|
||||
TestHandler handler;
|
||||
ASSERT_OK(batch.Iterate(&handler));
|
||||
ASSERT_EQ(
|
||||
"PutEntity(foo, 0_c_1_n:0_c_1_v "
|
||||
"0_c_2_n:0_c_2_v)"
|
||||
"PutEntityCF(2, foo, 2_c_1_n:2_c_1_v "
|
||||
"2_c_2_n:2_c_2_v)",
|
||||
handler.seen);
|
||||
}
|
||||
|
||||
TEST_F(WriteBatchTest, AttributeGroupSavePointTest) {
|
||||
WriteBatch batch;
|
||||
batch.SetSavePoint();
|
||||
|
||||
ColumnFamilyHandleImplDummy zero(0), two(2), three(3);
|
||||
AttributeGroups foo_ags;
|
||||
WideColumn zero_col_1{"0_c_1_n", "0_c_1_v"};
|
||||
WideColumn zero_col_2{"0_c_2_n", "0_c_2_v"};
|
||||
WideColumns zero_col_1_col_2{zero_col_1, zero_col_2};
|
||||
|
||||
WideColumn two_col_1{"2_c_1_n", "2_c_1_v"};
|
||||
WideColumn two_col_2{"2_c_2_n", "2_c_2_v"};
|
||||
WideColumns two_col_1_col_2{two_col_1, two_col_2};
|
||||
|
||||
foo_ags.emplace_back(&zero, zero_col_1_col_2);
|
||||
foo_ags.emplace_back(&two, two_col_1_col_2);
|
||||
|
||||
AttributeGroups bar_ags;
|
||||
WideColumn three_col_1{"3_c_1_n", "3_c_1_v"};
|
||||
WideColumn three_col_2{"3_c_2_n", "3_c_2_v"};
|
||||
WideColumns three_col_1_col_2{three_col_1, three_col_2};
|
||||
|
||||
bar_ags.emplace_back(&zero, zero_col_1_col_2);
|
||||
bar_ags.emplace_back(&three, three_col_1_col_2);
|
||||
|
||||
ASSERT_OK(batch.PutEntity("foo", foo_ags));
|
||||
batch.SetSavePoint();
|
||||
|
||||
ASSERT_OK(batch.PutEntity("bar", bar_ags));
|
||||
|
||||
TestHandler handler;
|
||||
ASSERT_OK(batch.Iterate(&handler));
|
||||
ASSERT_EQ(
|
||||
"PutEntity(foo, 0_c_1_n:0_c_1_v 0_c_2_n:0_c_2_v)"
|
||||
"PutEntityCF(2, foo, 2_c_1_n:2_c_1_v 2_c_2_n:2_c_2_v)"
|
||||
"PutEntity(bar, 0_c_1_n:0_c_1_v 0_c_2_n:0_c_2_v)"
|
||||
"PutEntityCF(3, bar, 3_c_1_n:3_c_1_v 3_c_2_n:3_c_2_v)",
|
||||
handler.seen);
|
||||
|
||||
ASSERT_OK(batch.RollbackToSavePoint());
|
||||
|
||||
handler.seen.clear();
|
||||
ASSERT_OK(batch.Iterate(&handler));
|
||||
ASSERT_EQ(
|
||||
"PutEntity(foo, 0_c_1_n:0_c_1_v 0_c_2_n:0_c_2_v)"
|
||||
"PutEntityCF(2, foo, 2_c_1_n:2_c_1_v 2_c_2_n:2_c_2_v)",
|
||||
handler.seen);
|
||||
}
|
||||
|
||||
TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
|
||||
WriteBatch batch;
|
||||
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
|
||||
|
|
|
@ -435,6 +435,10 @@ class DB {
|
|||
virtual Status PutEntity(const WriteOptions& options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const WideColumns& columns);
|
||||
// Split and store wide column entities in multiple column families (a.k.a.
|
||||
// AttributeGroups)
|
||||
virtual Status PutEntity(const WriteOptions& options, const Slice& key,
|
||||
const AttributeGroups& attribute_groups);
|
||||
|
||||
// Remove the database entry (if any) for "key". Returns OK on
|
||||
// success, and a non-OK status on error. It is not an error if "key"
|
||||
|
|
|
@ -92,6 +92,10 @@ class StackableDB : public DB {
|
|||
const WideColumns& columns) override {
|
||||
return db_->PutEntity(options, column_family, key, columns);
|
||||
}
|
||||
Status PutEntity(const WriteOptions& options, const Slice& key,
|
||||
const AttributeGroups& attribute_groups) override {
|
||||
return db_->PutEntity(options, key, attribute_groups);
|
||||
}
|
||||
|
||||
using DB::Get;
|
||||
virtual Status Get(const ReadOptions& options,
|
||||
|
|
|
@ -118,7 +118,16 @@ class WriteBatchWithIndex : public WriteBatchBase {
|
|||
return Status::InvalidArgument(
|
||||
"Cannot call this method without a column family handle");
|
||||
}
|
||||
return Status::NotSupported(
|
||||
"PutEntity not supported by WriteBatchWithIndex");
|
||||
}
|
||||
|
||||
Status PutEntity(const Slice& /* key */,
|
||||
const AttributeGroups& attribute_groups) override {
|
||||
if (attribute_groups.empty()) {
|
||||
return Status::InvalidArgument(
|
||||
"Cannot call this method without attribute groups");
|
||||
}
|
||||
return Status::NotSupported(
|
||||
"PutEntity not supported by WriteBatchWithIndex");
|
||||
}
|
||||
|
@ -301,4 +310,3 @@ class WriteBatchWithIndex : public WriteBatchBase {
|
|||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
|
|
|
@ -221,8 +221,27 @@ inline bool operator!=(const PinnableWideColumns& lhs,
|
|||
}
|
||||
|
||||
// Class representing attribute group. Attribute group is a logical grouping of
|
||||
// wide-column entities by leveraging Column Families. Wide-columns returned
|
||||
// from the query are pinnable.
|
||||
// wide-column entities by leveraging Column Families.
|
||||
// Used in Write Path
|
||||
class AttributeGroup {
|
||||
public:
|
||||
ColumnFamilyHandle* column_family() const { return column_family_; }
|
||||
const WideColumns& columns() const { return columns_; }
|
||||
WideColumns& columns() { return columns_; }
|
||||
|
||||
explicit AttributeGroup(ColumnFamilyHandle* column_family,
|
||||
const WideColumns& columns)
|
||||
: column_family_(column_family), columns_(columns) {}
|
||||
|
||||
private:
|
||||
ColumnFamilyHandle* column_family_;
|
||||
WideColumns columns_;
|
||||
};
|
||||
|
||||
// A collection of Attribute Groups.
|
||||
using AttributeGroups = std::vector<AttributeGroup>;
|
||||
|
||||
// Used in Read Path. Wide-columns returned from the query are pinnable.
|
||||
class PinnableAttributeGroup {
|
||||
public:
|
||||
ColumnFamilyHandle* column_family() const { return column_family_; }
|
||||
|
@ -255,7 +274,7 @@ inline void PinnableAttributeGroup::Reset() {
|
|||
columns_.Reset();
|
||||
}
|
||||
|
||||
// A collection of Attribute Groups.
|
||||
// A collection of Pinnable Attribute Groups.
|
||||
using PinnableAttributeGroups = std::vector<PinnableAttributeGroup>;
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
|
@ -106,6 +106,11 @@ class WriteBatch : public WriteBatchBase {
|
|||
Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const WideColumns& columns) override;
|
||||
|
||||
// Split and store wide column entities in multiple column families (a.k.a.
|
||||
// AttributeGroups)
|
||||
Status PutEntity(const Slice& key,
|
||||
const AttributeGroups& attribute_groups) override;
|
||||
|
||||
using WriteBatchBase::Delete;
|
||||
// If the database contains a mapping for "key", erase it. Else do nothing.
|
||||
// The following Delete(..., const Slice& key) can be used when user-defined
|
||||
|
|
|
@ -47,6 +47,11 @@ class WriteBatchBase {
|
|||
virtual Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const WideColumns& columns) = 0;
|
||||
|
||||
// Split and store wide column entities in multiple column families (a.k.a.
|
||||
// AttributeGroups)
|
||||
virtual Status PutEntity(const Slice& key,
|
||||
const AttributeGroups& attribute_groups) = 0;
|
||||
|
||||
// Merge "value" with the existing value of "key" in the database.
|
||||
// "key->merge(existing, value)"
|
||||
virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Add GetEntity() and PutEntity() API implementation for Attribute Group support. Through the use of Column Families, AttributeGroup enables users to logically group wide-column entities.
|
Loading…
Reference in New Issue