Make queries return the value of the default column for wide-column entities (#10483)

Summary:
The patch adds support for wide-column entities to the existing query
APIs (`Get`, `MultiGet`, and iterator). Namely, when during a query a
wide-column entity is encountered, we will return the value of the default
(anonymous) column as the result. Later, we plan to add wide-column
specific query APIs which will enable retrieving entire wide-column entities
or a subset of their columns.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10483

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D38441881

Pulled By: ltamasi

fbshipit-source-id: 6444e79a31aff2470e866698e3a97985bc2b3543
This commit is contained in:
Levi Tamasi 2022-08-08 16:10:08 -07:00 committed by Facebook GitHub Bot
parent a85443c001
commit 24bcab7d5d
8 changed files with 246 additions and 60 deletions

View File

@ -17,6 +17,7 @@
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/wide/wide_column_serialization.h"
#include "file/filename.h"
#include "logging/logging.h"
#include "memory/arena.h"
@ -75,6 +76,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
verify_checksums_(read_options.verify_checksums),
expose_blob_index_(expose_blob_index),
is_blob_(false),
is_wide_(false),
arena_mode_(arena_mode),
range_del_agg_(&ioptions.internal_comparator, s),
db_impl_(db_impl),
@ -132,6 +134,7 @@ void DBIter::Next() {
PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_);
// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
ResetWideColumnValue();
local_stats_.skip_count_ += num_internal_keys_skipped_;
local_stats_.skip_count_--;
num_internal_keys_skipped_ = 0;
@ -174,6 +177,8 @@ void DBIter::Next() {
bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
const Slice& blob_index) {
assert(!is_blob_);
assert(!is_wide_);
assert(value_of_default_column_.empty());
if (expose_blob_index_) { // Stacked BlobDB implementation
is_blob_ = true;
@ -209,13 +214,24 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
return true;
}
bool DBIter::SetWideColumnValueIfNeeded(const Slice& /* wide_columns_slice */) {
bool DBIter::SetWideColumnValueIfNeeded(const Slice& wide_columns_slice) {
assert(!is_blob_);
assert(!is_wide_);
assert(value_of_default_column_.empty());
// TODO: support wide-column entities
status_ = Status::NotSupported("Encountered unexpected wide-column entity");
valid_ = false;
return false;
Slice wide_columns_copy = wide_columns_slice;
const Status s = WideColumnSerialization::GetValueOfDefaultColumn(
wide_columns_copy, value_of_default_column_);
if (!s.ok()) {
status_ = s;
valid_ = false;
return false;
}
is_wide_ = true;
return true;
}
// PRE: saved_key_ has the current user key if skipping_saved_key
@ -265,6 +281,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
bool reseek_done = false;
is_blob_ = false;
assert(!is_wide_);
assert(value_of_default_column_.empty());
do {
// Will update is_key_seqnum_zero_ as soon as we parsed the current key
@ -592,7 +610,11 @@ bool DBIter::MergeValuesNewToOld() {
if (!s.ok()) {
return false;
}
is_blob_ = false;
assert(!is_wide_);
assert(value_of_default_column_.empty());
// iter_ is positioned after put
iter_.Next();
if (!iter_.status().ok()) {
@ -638,6 +660,7 @@ void DBIter::Prev() {
PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_);
ReleaseTempPinnedData();
ResetWideColumnValue();
ResetInternalKeysSkippedCounter();
bool ok = true;
if (direction_ == kForward) {
@ -968,6 +991,9 @@ bool DBIter::FindValueForCurrentKey() {
Status s;
s.PermitUncheckedError();
is_blob_ = false;
assert(!is_wide_);
assert(value_of_default_column_.empty());
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
@ -1006,7 +1032,11 @@ bool DBIter::FindValueForCurrentKey() {
if (!s.ok()) {
return false;
}
is_blob_ = false;
assert(!is_wide_);
assert(value_of_default_column_.empty());
return true;
} else if (last_not_merge_type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
@ -1082,6 +1112,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
// Find the next value that's visible.
ParsedInternalKey ikey;
is_blob_ = false;
assert(!is_wide_);
assert(value_of_default_column_.empty());
while (true) {
if (!iter_.Valid()) {
valid_ = false;
@ -1216,7 +1249,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!s.ok()) {
return false;
}
is_blob_ = false;
assert(!is_wide_);
assert(value_of_default_column_.empty());
return true;
} else if (ikey.type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
@ -1441,6 +1478,7 @@ void DBIter::Seek(const Slice& target) {
status_ = Status::OK();
ReleaseTempPinnedData();
ResetWideColumnValue();
ResetInternalKeysSkippedCounter();
// Seek the inner iterator based on the target key.
@ -1517,6 +1555,7 @@ void DBIter::SeekForPrev(const Slice& target) {
status_ = Status::OK();
ReleaseTempPinnedData();
ResetWideColumnValue();
ResetInternalKeysSkippedCounter();
// Seek the inner iterator based on the target key.
@ -1574,6 +1613,7 @@ void DBIter::SeekToFirst() {
status_ = Status::OK();
direction_ = kForward;
ReleaseTempPinnedData();
ResetWideColumnValue();
ResetInternalKeysSkippedCounter();
ClearSavedValue();
is_key_seqnum_zero_ = false;
@ -1621,6 +1661,7 @@ void DBIter::SeekToLast() {
*iterate_upper_bound_, /*a_has_ts=*/false, k,
/*b_has_ts=*/false)) {
ReleaseTempPinnedData();
ResetWideColumnValue();
PrevInternal(nullptr);
k = key();
@ -1640,6 +1681,7 @@ void DBIter::SeekToLast() {
status_ = Status::OK();
direction_ = kReverse;
ReleaseTempPinnedData();
ResetWideColumnValue();
ResetInternalKeysSkippedCounter();
ClearSavedValue();
is_key_seqnum_zero_ = false;

View File

@ -160,9 +160,12 @@ class DBIter final : public Iterator {
}
Slice value() const override {
assert(valid_);
assert(!is_blob_ || !is_wide_);
if (!expose_blob_index_ && is_blob_) {
return blob_value_;
} else if (is_wide_) {
return value_of_default_column_;
} else if (current_entry_is_merged_) {
// If pinned_value_ is set then the result of merge operator is one of
// the merge operands and we should return it.
@ -302,6 +305,11 @@ class DBIter final : public Iterator {
bool SetWideColumnValueIfNeeded(const Slice& wide_columns_slice);
void ResetWideColumnValue() {
is_wide_ = false;
value_of_default_column_.clear();
}
Status Merge(const Slice* val, const Slice& user_key);
const SliceTransform* prefix_extractor_;
@ -326,6 +334,7 @@ class DBIter final : public Iterator {
Slice pinned_value_;
// for prefix seek mode to support prev()
PinnableSlice blob_value_;
Slice value_of_default_column_;
Statistics* statistics_;
uint64_t max_skip_;
uint64_t max_skippable_internal_keys_;
@ -362,6 +371,7 @@ class DBIter final : public Iterator {
// the stacked BlobDB implementation is used, false otherwise.
bool expose_blob_index_;
bool is_blob_;
bool is_wide_;
bool arena_mode_;
// List of operands for merge operator.
MergeContext merge_context_;

View File

@ -21,6 +21,7 @@
#include "db/pinned_iterators_manager.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/read_callback.h"
#include "db/wide/wide_column_serialization.h"
#include "logging/logging.h"
#include "memory/arena.h"
#include "memory/memory_usage.h"
@ -786,12 +787,6 @@ static bool SaveValue(void* arg, const char* entry) {
"Encounter unsupported blob value. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
}
} else {
assert(type == kTypeWideColumnEntity);
// TODO: support wide-column entities
*(s->status) =
Status::NotSupported("Encountered unexpected wide-column entity");
}
if (!s->status->ok()) {
@ -825,7 +820,17 @@ static bool SaveValue(void* arg, const char* entry) {
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
if (type != kTypeWideColumnEntity) {
assert(type == kTypeValue || type == kTypeBlobIndex);
s->value->assign(v.data(), v.size());
} else {
Slice value;
*(s->status) =
WideColumnSerialization::GetValueOfDefaultColumn(v, value);
if (s->status->ok()) {
s->value->assign(value.data(), value.size());
}
}
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();

View File

@ -22,15 +22,90 @@ class DBWideBasicTest : public DBTestBase {
TEST_F(DBWideBasicTest, PutEntity) {
Options options = GetDefaultOptions();
// Use the DB::PutEntity API
constexpr char first_key[] = "first";
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
constexpr char second_key[] = "second";
constexpr char first_value_of_default_column[] = "hello";
auto verify = [&]() {
{
PinnableSlice result;
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key,
&result));
ASSERT_EQ(result, first_value_of_default_column);
}
{
PinnableSlice result;
ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key,
&result));
ASSERT_TRUE(result.empty());
}
{
constexpr size_t num_keys = 2;
std::array<Slice, num_keys> keys{{first_key, second_key}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value_of_default_column);
ASSERT_OK(statuses[1]);
ASSERT_TRUE(values[1].empty());
}
{
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), first_value_of_default_column);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_TRUE(iter->value().empty());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), second_key);
ASSERT_TRUE(iter->value().empty());
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), first_key);
ASSERT_EQ(iter->value(), first_value_of_default_column);
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
};
// Use the DB::PutEntity API
WideColumns first_columns{
{kDefaultWideColumnName, first_value_of_default_column},
{"attr_name1", "foo"},
{"attr_name2", "bar"}};
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
// Use WriteBatch
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WriteBatch batch;
@ -38,7 +113,51 @@ TEST_F(DBWideBasicTest, PutEntity) {
batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
// Note: currently, read APIs are supposed to return NotSupported
// Try reading from memtable
verify();
// Try reading after recovery
Close();
options.avoid_flush_during_recovery = true;
Reopen(options);
verify();
// Try reading from storage
ASSERT_OK(Flush());
verify();
}
TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"corinthian"}, options);
// Use the DB::PutEntity API
constexpr char first_key[] = "first";
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
ASSERT_OK(
db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns));
// Use WriteBatch
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WriteBatch batch;
ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) {
Options options = GetDefaultOptions();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
Reopen(options);
constexpr char first_key[] = "first";
constexpr char second_key[] = "second";
// Note: Merge is currently not supported for wide-column entities
auto verify = [&]() {
{
PinnableSlice result;
@ -84,26 +203,23 @@ TEST_F(DBWideBasicTest, PutEntity) {
}
};
// Try reading from memtable
verify();
// Use the DB::PutEntity API
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
// Try reading after recovery
Close();
options.avoid_flush_during_recovery = true;
Reopen(options);
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
verify();
// Use WriteBatch
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WriteBatch batch;
ASSERT_OK(
batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
// Try reading from storage
ASSERT_OK(Flush());
verify();
// Add a couple of merge operands
Close();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
Reopen(options);
constexpr char merge_operand[] = "bla";
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
@ -111,15 +227,15 @@ TEST_F(DBWideBasicTest, PutEntity) {
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
merge_operand));
// Try reading from memtable
// Try reading when PutEntity is in storage, Merge is in memtable
verify();
// Try reading from storage
// Try reading when PutEntity and Merge are both in storage
ASSERT_OK(Flush());
verify();
// Do it again, with the Put and the Merge in the same memtable
// Try reading when PutEntity and Merge are both in memtable
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
@ -128,30 +244,9 @@ TEST_F(DBWideBasicTest, PutEntity) {
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
merge_operand));
// Try reading from memtable
verify();
}
TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"corinthian"}, options);
// Use the DB::PutEntity API
constexpr char first_key[] = "first";
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
ASSERT_OK(
db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns));
// Use WriteBatch
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WriteBatch batch;
ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
TEST_F(DBWideBasicTest, PutEntityTimestampError) {
// Note: timestamps are currently not supported

View File

@ -15,6 +15,8 @@
namespace ROCKSDB_NAMESPACE {
const Slice kDefaultWideColumnName;
Status WideColumnSerialization::Serialize(const WideColumns& columns,
std::string& output) {
if (columns.size() >
@ -137,4 +139,23 @@ WideColumns::const_iterator WideColumnSerialization::Find(
return it;
}
Status WideColumnSerialization::GetValueOfDefaultColumn(Slice& input,
Slice& value) {
WideColumns columns;
const Status s = Deserialize(input, columns);
if (!s.ok()) {
return s;
}
if (columns.empty() || columns[0].name() != kDefaultWideColumnName) {
value.clear();
return Status::OK();
}
value = columns[0].value();
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -48,6 +48,7 @@ class WideColumnSerialization {
static WideColumns::const_iterator Find(const WideColumns& columns,
const Slice& column_name);
static Status GetValueOfDefaultColumn(Slice& input, Slice& value);
static constexpr uint32_t kCurrentVersion = 1;
};

View File

@ -71,4 +71,6 @@ inline bool operator!=(const WideColumn& lhs, const WideColumn& rhs) {
using WideColumns = std::vector<WideColumn>;
extern const Slice kDefaultWideColumnName;
} // namespace ROCKSDB_NAMESPACE

View File

@ -9,6 +9,7 @@
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/read_callback.h"
#include "db/wide/wide_column_serialization.h"
#include "monitoring/file_read_sample.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
@ -258,10 +259,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kUnexpectedBlobIndex;
return false;
}
} else if (type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
state_ = kUnexpectedWideColumnEntity;
return false;
}
if (is_blob_index_ != nullptr) {
@ -272,14 +269,27 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
state_ = kFound;
if (do_merge_) {
if (LIKELY(pinnable_val_ != nullptr)) {
Slice value_to_use = value;
if (type == kTypeWideColumnEntity) {
Slice value_copy = value;
if (!WideColumnSerialization::GetValueOfDefaultColumn(
value_copy, value_to_use)
.ok()) {
state_ = kCorrupt;
return false;
}
}
if (LIKELY(value_pinner != nullptr)) {
// If the backing resources for the value are provided, pin them
pinnable_val_->PinSlice(value, value_pinner);
pinnable_val_->PinSlice(value_to_use, value_pinner);
} else {
TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
this);
// Otherwise copy the value
pinnable_val_->PinSelf(value);
pinnable_val_->PinSelf(value_to_use);
}
}
} else {