Implement WriteBatchWithIndex::GetEntityFromBatch (#12424)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12424

The PR adds a wide-column point lookup API `GetEntityFromBatch` to `WriteBatchWithIndex`. Similarly to APIs like `DB::GetEntity`, this new API returns wide-column entities as-is, and wraps plain values in an entity with a single column (the anonymous default column). Also, similarly to `WriteBatchWithIndex::GetFromBatch`, it only reads data from the batch itself.

Reviewed By: jaykorean

Differential Revision: D54826535

fbshipit-source-id: 92604f3ebd90fe1afbd36f2d2194b7dee0011efa
This commit is contained in:
Levi Tamasi 2024-03-14 10:45:49 -07:00 committed by Facebook GitHub Bot
parent ba022dd44c
commit 7c290f72b8
6 changed files with 285 additions and 16 deletions

View file

@ -229,7 +229,18 @@ class WriteBatchWithIndex : public WriteBatchBase {
return GetFromBatch(nullptr, options, key, value); return GetFromBatch(nullptr, options, key, value);
} }
// TODO: implement GetEntityFromBatch // If the batch contains an entry for "key" in "column_family", return it as a
// wide-column entity in "*columns". If the entry is a wide-column entity,
// return it as-is; if it is a plain key-value, return it as an entity with a
// single anonymous column (see kDefaultWideColumnName) which contains the
// value.
//
// Returns OK on success, NotFound if the there is no mapping for "key",
// MergeInProgress if the key has merge operands but the base value cannot be
// resolved based on the batch, or some error status (e.g. Corruption
// or InvalidArgument) on failure.
Status GetEntityFromBatch(ColumnFamilyHandle* column_family, const Slice& key,
PinnableWideColumns* columns);
// Similar to DB::Get() but will also read writes from this batch. // Similar to DB::Get() but will also read writes from this batch.
// //

View file

@ -0,0 +1 @@
* `WriteBatchWithIndex` now supports wide-column point lookups via the `GetEntityFromBatch` API. See the API comments for more details.

View file

@ -460,6 +460,26 @@ Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
void WriteBatchWithIndex::Clear() { rep->Clear(); } void WriteBatchWithIndex::Clear() { rep->Clear(); }
namespace {
Status PostprocessStatusBatchOnly(const Status& s,
WBWIIteratorImpl::Result result) {
if (result == WBWIIteratorImpl::kDeleted ||
result == WBWIIteratorImpl::kNotFound) {
s.PermitUncheckedError();
return Status::NotFound();
}
if (result == WBWIIteratorImpl::kMergeInProgress) {
s.PermitUncheckedError();
return Status::MergeInProgress();
}
assert(result == WBWIIteratorImpl::kFound ||
result == WBWIIteratorImpl::kError);
return s;
}
} // anonymous namespace
Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
const DBOptions& /* options */, const DBOptions& /* options */,
const Slice& key, std::string* value) { const Slice& key, std::string* value) {
@ -468,23 +488,28 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,
auto result = WriteBatchWithIndexInternal::GetFromBatch( auto result = WriteBatchWithIndexInternal::GetFromBatch(
this, column_family, key, &merge_context, value, &s); this, column_family, key, &merge_context, value, &s);
switch (result) { return PostprocessStatusBatchOnly(s, result);
case WBWIIteratorImpl::kFound:
case WBWIIteratorImpl::kError:
// use returned status
break;
case WBWIIteratorImpl::kDeleted:
case WBWIIteratorImpl::kNotFound:
s = Status::NotFound();
break;
case WBWIIteratorImpl::kMergeInProgress:
s = Status::MergeInProgress();
break;
default:
assert(false);
} }
return s; Status WriteBatchWithIndex::GetEntityFromBatch(
ColumnFamilyHandle* column_family, const Slice& key,
PinnableWideColumns* columns) {
if (!column_family) {
return Status::InvalidArgument(
"Cannot call GetEntityFromBatch without a column family handle");
}
if (!columns) {
return Status::InvalidArgument(
"Cannot call GetEntityFromBatch without a PinnableWideColumns object");
}
MergeContext merge_context;
Status s;
auto result = WriteBatchWithIndexInternal::GetEntityFromBatch(
this, column_family, key, &merge_context, columns, &s);
return PostprocessStatusBatchOnly(s, result);
} }
Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db,

View file

@ -802,4 +802,88 @@ WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch(
return result; return result;
} }
WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetEntityFromBatch(
WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family,
const Slice& key, MergeContext* context, PinnableWideColumns* columns,
Status* s) {
assert(batch);
assert(column_family);
assert(context);
assert(columns);
assert(s);
columns->Reset();
*s = Status::OK();
std::unique_ptr<WBWIIteratorImpl> iter(
static_cast_with_check<WBWIIteratorImpl>(
batch->NewIterator(column_family)));
iter->Seek(key);
auto result = iter->FindLatestUpdate(key, context);
if (result == WBWIIteratorImpl::kError) {
(*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:",
std::to_string(iter->Entry().type));
return result;
}
if (result == WBWIIteratorImpl::kNotFound) {
return result;
}
if (result == WBWIIteratorImpl::Result::kFound) { // Put/PutEntity
WriteEntry entry = iter->Entry();
Slice entry_value = entry.value;
if (context->GetNumOperands() > 0) {
if (entry.type == kPutRecord) {
*s = MergeKeyWithBaseValue(
column_family, key, MergeHelper::kPlainBaseValue, entry_value,
*context, static_cast<std::string*>(nullptr), columns);
} else {
assert(entry.type == kPutEntityRecord);
*s = MergeKeyWithBaseValue(
column_family, key, MergeHelper::kWideBaseValue, entry_value,
*context, static_cast<std::string*>(nullptr), columns);
}
if (!s->ok()) {
result = WBWIIteratorImpl::Result::kError;
}
} else {
if (entry.type == kPutRecord) {
columns->SetPlainValue(entry_value);
} else {
assert(entry.type == kPutEntityRecord);
*s = columns->SetWideColumnValue(entry_value);
if (!s->ok()) {
result = WBWIIteratorImpl::Result::kError;
}
}
}
return result;
}
if (result == WBWIIteratorImpl::kDeleted) {
if (context->GetNumOperands() > 0) {
*s = MergeKeyWithNoBaseValue(column_family, key, *context,
static_cast<std::string*>(nullptr), columns);
if (s->ok()) {
result = WBWIIteratorImpl::Result::kFound;
} else {
result = WBWIIteratorImpl::Result::kError;
}
}
return result;
}
assert(result == WBWIIteratorImpl::Result::kMergeInProgress);
return result;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View file

@ -449,6 +449,11 @@ class WriteBatchWithIndexInternal {
const Slice& key, MergeContext* merge_context, std::string* value, const Slice& key, MergeContext* merge_context, std::string* value,
Status* s); Status* s);
static WBWIIteratorImpl::Result GetEntityFromBatch(
WriteBatchWithIndex* batch, ColumnFamilyHandle* column_family,
const Slice& key, MergeContext* merge_context,
PinnableWideColumns* columns, Status* s);
private: private:
static Status CheckAndGetImmutableOptions(ColumnFamilyHandle* column_family, static Status CheckAndGetImmutableOptions(ColumnFamilyHandle* column_family,
const ImmutableOptions** ioptions); const ImmutableOptions** ioptions);

View file

@ -13,6 +13,7 @@
#include <memory> #include <memory>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/wide/wide_columns_helper.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
@ -2766,6 +2767,148 @@ TEST_P(WriteBatchWithIndexTest, WideColumnsBatchAndDB) {
ASSERT_FALSE(iter->Valid()); ASSERT_FALSE(iter->Valid());
} }
TEST_P(WriteBatchWithIndexTest, GetEntityFromBatch) {
ASSERT_OK(OpenDB());
// No base value, no merges => NotFound
{
constexpr char key[] = "a";
PinnableWideColumns result;
ASSERT_TRUE(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)
.IsNotFound());
}
// No base value, with merges => MergeInProgress
{
constexpr char key[] = "b";
constexpr char merge_op1[] = "bv1";
constexpr char merge_op2[] = "bv2";
ASSERT_OK(batch_->Merge("b", merge_op1));
ASSERT_OK(batch_->Merge("b", merge_op2));
PinnableWideColumns result;
ASSERT_TRUE(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)
.IsMergeInProgress());
}
// Plain value, no merges => Found
{
constexpr char key[] = "c";
constexpr char value[] = "cv";
ASSERT_OK(batch_->Put(key, value));
PinnableWideColumns result;
ASSERT_OK(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
const WideColumns expected{{kDefaultWideColumnName, value}};
ASSERT_EQ(result.columns(), expected);
}
// Wide-column value, no merges => Found
{
constexpr char key[] = "d";
const WideColumns columns{
{kDefaultWideColumnName, "d0v"}, {"1", "d1v"}, {"2", "d2v"}};
ASSERT_OK(batch_->PutEntity(db_->DefaultColumnFamily(), key, columns));
PinnableWideColumns result;
ASSERT_OK(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
ASSERT_EQ(result.columns(), columns);
}
// Plain value, with merges => Found
{
constexpr char key[] = "e";
constexpr char base_value[] = "ev0";
constexpr char merge_op1[] = "ev1";
constexpr char merge_op2[] = "ev2";
ASSERT_OK(batch_->Put(key, base_value));
ASSERT_OK(batch_->Merge(key, merge_op1));
ASSERT_OK(batch_->Merge(key, merge_op2));
PinnableWideColumns result;
ASSERT_OK(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
const WideColumns expected{{kDefaultWideColumnName, "ev0,ev1,ev2"}};
ASSERT_EQ(result.columns(), expected);
}
// Wide-column value, with merges => Found
{
constexpr char key[] = "f";
const WideColumns base_columns{
{kDefaultWideColumnName, "f0v0"}, {"1", "f1v"}, {"2", "f2v"}};
constexpr char merge_op1[] = "f0v1";
constexpr char merge_op2[] = "f0v2";
ASSERT_OK(batch_->PutEntity(db_->DefaultColumnFamily(), key, base_columns));
ASSERT_OK(batch_->Merge(key, merge_op1));
ASSERT_OK(batch_->Merge(key, merge_op2));
PinnableWideColumns result;
ASSERT_OK(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
const WideColumns expected{{kDefaultWideColumnName, "f0v0,f0v1,f0v2"},
base_columns[1],
base_columns[2]};
ASSERT_EQ(result.columns(), expected);
}
// Delete, no merges => NotFound
{
constexpr char key[] = "g";
ASSERT_OK(batch_->Delete(key));
PinnableWideColumns result;
ASSERT_TRUE(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result)
.IsNotFound());
}
// Delete, with merges => Found
{
constexpr char key[] = "h";
constexpr char merge_op1[] = "hv1";
constexpr char merge_op2[] = "hv2";
ASSERT_OK(batch_->Delete(key));
ASSERT_OK(batch_->Merge(key, merge_op1));
ASSERT_OK(batch_->Merge(key, merge_op2));
PinnableWideColumns result;
ASSERT_OK(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, &result));
const WideColumns expected{{kDefaultWideColumnName, "hv1,hv2"}};
ASSERT_EQ(result.columns(), expected);
}
// Validate parameters
{
constexpr char key[] = "foo";
PinnableWideColumns result;
ASSERT_TRUE(
batch_->GetEntityFromBatch(nullptr, key, &result).IsInvalidArgument());
ASSERT_TRUE(
batch_->GetEntityFromBatch(db_->DefaultColumnFamily(), key, nullptr)
.IsInvalidArgument());
}
}
INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool());
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE