mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 02:44:18 +00:00
2c1f95291d
Summary: When constructing a write batch a client may now call MarkWalTerminationPoint() on that batch. No batch operations after this call will be added written to the WAL but will still be inserted into the Memtable. This facility is used to remove one of the three WriteImpl calls in 2PC transactions. This produces a ~1% perf improvement. ``` RocksDB - unoptimized 2pc, sync_binlog=1, disable_2pc=off INFO 2016-08-31 14:30:38,814 [main]: REQUEST PHASE COMPLETED. 75000000 requests done in 2619 seconds. Requests/second = 28628 RocksDB - optimized 2pc , sync_binlog=1, disable_2pc=off INFO 2016-08-31 16:26:59,442 [main]: REQUEST PHASE COMPLETED. 75000000 requests done in 2581 seconds. Requests/second = 29054 ``` Test Plan: Two unit tests added. Reviewers: sdong, yiwu, IslamAbdelRahman Reviewed By: yiwu Subscribers: hermanlee4, dhruba, andrewkr Differential Revision: https://reviews.facebook.net/D64599
861 lines
26 KiB
C++
861 lines
26 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "rocksdb/db.h"
|
|
|
|
#include <memory>
|
|
#include "db/column_family.h"
|
|
#include "db/memtable.h"
|
|
#include "db/write_batch_internal.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/memtablerep.h"
|
|
#include "rocksdb/utilities/write_batch_with_index.h"
|
|
#include "rocksdb/write_buffer_manager.h"
|
|
#include "table/scoped_arena_iterator.h"
|
|
#include "util/logging.h"
|
|
#include "util/string_util.h"
|
|
#include "util/testharness.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
static std::string PrintContents(WriteBatch* b) {
|
|
InternalKeyComparator cmp(BytewiseComparator());
|
|
auto factory = std::make_shared<SkipListFactory>();
|
|
Options options;
|
|
options.memtable_factory = factory;
|
|
ImmutableCFOptions ioptions(options);
|
|
WriteBufferManager wb(options.db_write_buffer_size);
|
|
MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
|
|
kMaxSequenceNumber);
|
|
mem->Ref();
|
|
std::string state;
|
|
ColumnFamilyMemTablesDefault cf_mems_default(mem);
|
|
Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr);
|
|
int count = 0;
|
|
int put_count = 0;
|
|
int delete_count = 0;
|
|
int single_delete_count = 0;
|
|
int delete_range_count = 0;
|
|
int merge_count = 0;
|
|
for (int i = 0; i < 2; ++i) {
|
|
Arena arena;
|
|
auto iter =
|
|
i == 0 ? ScopedArenaIterator(mem->NewIterator(ReadOptions(), &arena))
|
|
: ScopedArenaIterator(
|
|
mem->NewRangeTombstoneIterator(ReadOptions(), &arena));
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
ParsedInternalKey ikey;
|
|
memset((void*)&ikey, 0, sizeof(ikey));
|
|
EXPECT_TRUE(ParseInternalKey(iter->key(), &ikey));
|
|
switch (ikey.type) {
|
|
case kTypeValue:
|
|
state.append("Put(");
|
|
state.append(ikey.user_key.ToString());
|
|
state.append(", ");
|
|
state.append(iter->value().ToString());
|
|
state.append(")");
|
|
count++;
|
|
put_count++;
|
|
break;
|
|
case kTypeDeletion:
|
|
state.append("Delete(");
|
|
state.append(ikey.user_key.ToString());
|
|
state.append(")");
|
|
count++;
|
|
delete_count++;
|
|
break;
|
|
case kTypeSingleDeletion:
|
|
state.append("SingleDelete(");
|
|
state.append(ikey.user_key.ToString());
|
|
state.append(")");
|
|
count++;
|
|
single_delete_count++;
|
|
break;
|
|
case kTypeRangeDeletion:
|
|
state.append("DeleteRange(");
|
|
state.append(ikey.user_key.ToString());
|
|
state.append(", ");
|
|
state.append(iter->value().ToString());
|
|
state.append(")");
|
|
count++;
|
|
delete_range_count++;
|
|
break;
|
|
case kTypeMerge:
|
|
state.append("Merge(");
|
|
state.append(ikey.user_key.ToString());
|
|
state.append(", ");
|
|
state.append(iter->value().ToString());
|
|
state.append(")");
|
|
count++;
|
|
merge_count++;
|
|
break;
|
|
default:
|
|
assert(false);
|
|
break;
|
|
}
|
|
state.append("@");
|
|
state.append(NumberToString(ikey.sequence));
|
|
}
|
|
}
|
|
EXPECT_EQ(b->HasPut(), put_count > 0);
|
|
EXPECT_EQ(b->HasDelete(), delete_count > 0);
|
|
EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0);
|
|
EXPECT_EQ(b->HasDeleteRange(), delete_range_count > 0);
|
|
EXPECT_EQ(b->HasMerge(), merge_count > 0);
|
|
if (!s.ok()) {
|
|
state.append(s.ToString());
|
|
} else if (count != WriteBatchInternal::Count(b)) {
|
|
state.append("CountMismatch()");
|
|
}
|
|
delete mem->Unref();
|
|
return state;
|
|
}
|
|
|
|
class WriteBatchTest : public testing::Test {};
|
|
|
|
TEST_F(WriteBatchTest, Empty) {
|
|
WriteBatch batch;
|
|
ASSERT_EQ("", PrintContents(&batch));
|
|
ASSERT_EQ(0, WriteBatchInternal::Count(&batch));
|
|
ASSERT_EQ(0, batch.Count());
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, Multiple) {
|
|
WriteBatch batch;
|
|
batch.Put(Slice("foo"), Slice("bar"));
|
|
batch.Delete(Slice("box"));
|
|
batch.DeleteRange(Slice("bar"), Slice("foo"));
|
|
batch.Put(Slice("baz"), Slice("boo"));
|
|
WriteBatchInternal::SetSequence(&batch, 100);
|
|
ASSERT_EQ(100U, WriteBatchInternal::Sequence(&batch));
|
|
ASSERT_EQ(4, WriteBatchInternal::Count(&batch));
|
|
ASSERT_EQ(
|
|
"Put(baz, boo)@103"
|
|
"Delete(box)@101"
|
|
"Put(foo, bar)@100"
|
|
"DeleteRange(bar, foo)@102",
|
|
PrintContents(&batch));
|
|
ASSERT_EQ(4, batch.Count());
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, Corruption) {
|
|
WriteBatch batch;
|
|
batch.Put(Slice("foo"), Slice("bar"));
|
|
batch.Delete(Slice("box"));
|
|
WriteBatchInternal::SetSequence(&batch, 200);
|
|
Slice contents = WriteBatchInternal::Contents(&batch);
|
|
WriteBatchInternal::SetContents(&batch,
|
|
Slice(contents.data(),contents.size()-1));
|
|
ASSERT_EQ("Put(foo, bar)@200"
|
|
"Corruption: bad WriteBatch Delete",
|
|
PrintContents(&batch));
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, Append) {
|
|
WriteBatch b1, b2;
|
|
WriteBatchInternal::SetSequence(&b1, 200);
|
|
WriteBatchInternal::SetSequence(&b2, 300);
|
|
WriteBatchInternal::Append(&b1, &b2);
|
|
ASSERT_EQ("",
|
|
PrintContents(&b1));
|
|
ASSERT_EQ(0, b1.Count());
|
|
b2.Put("a", "va");
|
|
WriteBatchInternal::Append(&b1, &b2);
|
|
ASSERT_EQ("Put(a, va)@200",
|
|
PrintContents(&b1));
|
|
ASSERT_EQ(1, b1.Count());
|
|
b2.Clear();
|
|
b2.Put("b", "vb");
|
|
WriteBatchInternal::Append(&b1, &b2);
|
|
ASSERT_EQ("Put(a, va)@200"
|
|
"Put(b, vb)@201",
|
|
PrintContents(&b1));
|
|
ASSERT_EQ(2, b1.Count());
|
|
b2.Delete("foo");
|
|
WriteBatchInternal::Append(&b1, &b2);
|
|
ASSERT_EQ("Put(a, va)@200"
|
|
"Put(b, vb)@202"
|
|
"Put(b, vb)@201"
|
|
"Delete(foo)@203",
|
|
PrintContents(&b1));
|
|
ASSERT_EQ(4, b1.Count());
|
|
b2.Clear();
|
|
b2.Put("c", "cc");
|
|
b2.Put("d", "dd");
|
|
b2.MarkWalTerminationPoint();
|
|
b2.Put("e", "ee");
|
|
WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true);
|
|
ASSERT_EQ(
|
|
"Put(a, va)@200"
|
|
"Put(b, vb)@202"
|
|
"Put(b, vb)@201"
|
|
"Put(c, cc)@204"
|
|
"Put(d, dd)@205"
|
|
"Delete(foo)@203",
|
|
PrintContents(&b1));
|
|
ASSERT_EQ(6, b1.Count());
|
|
ASSERT_EQ(
|
|
"Put(c, cc)@0"
|
|
"Put(d, dd)@1"
|
|
"Put(e, ee)@2",
|
|
PrintContents(&b2));
|
|
ASSERT_EQ(3, b2.Count());
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, SingleDeletion) {
|
|
WriteBatch batch;
|
|
WriteBatchInternal::SetSequence(&batch, 100);
|
|
ASSERT_EQ("", PrintContents(&batch));
|
|
ASSERT_EQ(0, batch.Count());
|
|
batch.Put("a", "va");
|
|
ASSERT_EQ("Put(a, va)@100", PrintContents(&batch));
|
|
ASSERT_EQ(1, batch.Count());
|
|
batch.SingleDelete("a");
|
|
ASSERT_EQ(
|
|
"SingleDelete(a)@101"
|
|
"Put(a, va)@100",
|
|
PrintContents(&batch));
|
|
ASSERT_EQ(2, batch.Count());
|
|
}
|
|
|
|
namespace {
|
|
struct TestHandler : public WriteBatch::Handler {
|
|
std::string seen;
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
if (column_family_id == 0) {
|
|
seen += "Put(" + key.ToString() + ", " + value.ToString() + ")";
|
|
} else {
|
|
seen += "PutCF(" + ToString(column_family_id) + ", " +
|
|
key.ToString() + ", " + value.ToString() + ")";
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual Status DeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
if (column_family_id == 0) {
|
|
seen += "Delete(" + key.ToString() + ")";
|
|
} else {
|
|
seen += "DeleteCF(" + ToString(column_family_id) + ", " +
|
|
key.ToString() + ")";
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual Status SingleDeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
if (column_family_id == 0) {
|
|
seen += "SingleDelete(" + key.ToString() + ")";
|
|
} else {
|
|
seen += "SingleDeleteCF(" + ToString(column_family_id) + ", " +
|
|
key.ToString() + ")";
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual Status DeleteRangeCF(uint32_t column_family_id,
|
|
const Slice& begin_key,
|
|
const Slice& end_key) override {
|
|
if (column_family_id == 0) {
|
|
seen += "DeleteRange(" + begin_key.ToString() + ", " +
|
|
end_key.ToString() + ")";
|
|
} else {
|
|
seen += "DeleteRangeCF(" + ToString(column_family_id) + ", " +
|
|
begin_key.ToString() + ", " + end_key.ToString() + ")";
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
if (column_family_id == 0) {
|
|
seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")";
|
|
} else {
|
|
seen += "MergeCF(" + ToString(column_family_id) + ", " +
|
|
key.ToString() + ", " + value.ToString() + ")";
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual void LogData(const Slice& blob) override {
|
|
seen += "LogData(" + blob.ToString() + ")";
|
|
}
|
|
virtual Status MarkBeginPrepare() override {
|
|
seen += "MarkBeginPrepare()";
|
|
return Status::OK();
|
|
}
|
|
virtual Status MarkEndPrepare(const Slice& xid) override {
|
|
seen += "MarkEndPrepare(" + xid.ToString() + ")";
|
|
return Status::OK();
|
|
}
|
|
virtual Status MarkCommit(const Slice& xid) override {
|
|
seen += "MarkCommit(" + xid.ToString() + ")";
|
|
return Status::OK();
|
|
}
|
|
virtual Status MarkRollback(const Slice& xid) override {
|
|
seen += "MarkRollback(" + xid.ToString() + ")";
|
|
return Status::OK();
|
|
}
|
|
};
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, PutNotImplemented) {
|
|
WriteBatch batch;
|
|
batch.Put(Slice("k1"), Slice("v1"));
|
|
ASSERT_EQ(1, batch.Count());
|
|
ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
|
|
|
|
WriteBatch::Handler handler;
|
|
ASSERT_OK(batch.Iterate(&handler));
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, DeleteNotImplemented) {
|
|
WriteBatch batch;
|
|
batch.Delete(Slice("k2"));
|
|
ASSERT_EQ(1, batch.Count());
|
|
ASSERT_EQ("Delete(k2)@0", PrintContents(&batch));
|
|
|
|
WriteBatch::Handler handler;
|
|
ASSERT_OK(batch.Iterate(&handler));
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, SingleDeleteNotImplemented) {
|
|
WriteBatch batch;
|
|
batch.SingleDelete(Slice("k2"));
|
|
ASSERT_EQ(1, batch.Count());
|
|
ASSERT_EQ("SingleDelete(k2)@0", PrintContents(&batch));
|
|
|
|
WriteBatch::Handler handler;
|
|
ASSERT_OK(batch.Iterate(&handler));
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, MergeNotImplemented) {
|
|
WriteBatch batch;
|
|
batch.Merge(Slice("foo"), Slice("bar"));
|
|
ASSERT_EQ(1, batch.Count());
|
|
ASSERT_EQ("Merge(foo, bar)@0", PrintContents(&batch));
|
|
|
|
WriteBatch::Handler handler;
|
|
ASSERT_OK(batch.Iterate(&handler));
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, Blob) {
|
|
WriteBatch batch;
|
|
batch.Put(Slice("k1"), Slice("v1"));
|
|
batch.Put(Slice("k2"), Slice("v2"));
|
|
batch.Put(Slice("k3"), Slice("v3"));
|
|
batch.PutLogData(Slice("blob1"));
|
|
batch.Delete(Slice("k2"));
|
|
batch.SingleDelete(Slice("k3"));
|
|
batch.PutLogData(Slice("blob2"));
|
|
batch.Merge(Slice("foo"), Slice("bar"));
|
|
ASSERT_EQ(6, batch.Count());
|
|
ASSERT_EQ(
|
|
"Merge(foo, bar)@5"
|
|
"Put(k1, v1)@0"
|
|
"Delete(k2)@3"
|
|
"Put(k2, v2)@1"
|
|
"SingleDelete(k3)@4"
|
|
"Put(k3, v3)@2",
|
|
PrintContents(&batch));
|
|
|
|
TestHandler handler;
|
|
batch.Iterate(&handler);
|
|
ASSERT_EQ(
|
|
"Put(k1, v1)"
|
|
"Put(k2, v2)"
|
|
"Put(k3, v3)"
|
|
"LogData(blob1)"
|
|
"Delete(k2)"
|
|
"SingleDelete(k3)"
|
|
"LogData(blob2)"
|
|
"Merge(foo, bar)",
|
|
handler.seen);
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, PrepareCommit) {
|
|
WriteBatch batch;
|
|
WriteBatchInternal::InsertNoop(&batch);
|
|
batch.Put(Slice("k1"), Slice("v1"));
|
|
batch.Put(Slice("k2"), Slice("v2"));
|
|
batch.SetSavePoint();
|
|
WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1"));
|
|
Status s = batch.RollbackToSavePoint();
|
|
ASSERT_EQ(s, Status::NotFound());
|
|
WriteBatchInternal::MarkCommit(&batch, Slice("xid1"));
|
|
WriteBatchInternal::MarkRollback(&batch, Slice("xid1"));
|
|
ASSERT_EQ(2, batch.Count());
|
|
|
|
TestHandler handler;
|
|
batch.Iterate(&handler);
|
|
ASSERT_EQ(
|
|
"MarkBeginPrepare()"
|
|
"Put(k1, v1)"
|
|
"Put(k2, v2)"
|
|
"MarkEndPrepare(xid1)"
|
|
"MarkCommit(xid1)"
|
|
"MarkRollback(xid1)",
|
|
handler.seen);
|
|
}
|
|
|
|
// It requires more than 30GB of memory to run the test. With single memory
|
|
// allocation of more than 30GB.
|
|
// Not all platform can run it. Also it runs a long time. So disable it.
|
|
TEST_F(WriteBatchTest, DISABLED_ManyUpdates) {
|
|
// Insert key and value of 3GB and push total batch size to 12GB.
|
|
static const size_t kKeyValueSize = 4u;
|
|
static const uint32_t kNumUpdates = 3 << 30;
|
|
std::string raw(kKeyValueSize, 'A');
|
|
WriteBatch batch(kNumUpdates * (4 + kKeyValueSize * 2) + 1024u);
|
|
char c = 'A';
|
|
for (uint32_t i = 0; i < kNumUpdates; i++) {
|
|
if (c > 'Z') {
|
|
c = 'A';
|
|
}
|
|
raw[0] = c;
|
|
raw[raw.length() - 1] = c;
|
|
c++;
|
|
batch.Put(raw, raw);
|
|
}
|
|
|
|
ASSERT_EQ(kNumUpdates, batch.Count());
|
|
|
|
struct NoopHandler : public WriteBatch::Handler {
|
|
uint32_t num_seen = 0;
|
|
char expected_char = 'A';
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
EXPECT_EQ(kKeyValueSize, key.size());
|
|
EXPECT_EQ(kKeyValueSize, value.size());
|
|
EXPECT_EQ(expected_char, key[0]);
|
|
EXPECT_EQ(expected_char, value[0]);
|
|
EXPECT_EQ(expected_char, key[kKeyValueSize - 1]);
|
|
EXPECT_EQ(expected_char, value[kKeyValueSize - 1]);
|
|
expected_char++;
|
|
if (expected_char > 'Z') {
|
|
expected_char = 'A';
|
|
}
|
|
++num_seen;
|
|
return Status::OK();
|
|
}
|
|
virtual Status DeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
EXPECT_TRUE(false);
|
|
return Status::OK();
|
|
}
|
|
virtual Status SingleDeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
EXPECT_TRUE(false);
|
|
return Status::OK();
|
|
}
|
|
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
EXPECT_TRUE(false);
|
|
return Status::OK();
|
|
}
|
|
virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); }
|
|
virtual bool Continue() override { return num_seen < kNumUpdates; }
|
|
} handler;
|
|
|
|
batch.Iterate(&handler);
|
|
ASSERT_EQ(kNumUpdates, handler.num_seen);
|
|
}
|
|
|
|
// The test requires more than 18GB memory to run it, with single memory
|
|
// allocation of more than 12GB. Not all the platform can run it. So disable it.
|
|
TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) {
|
|
// Insert key and value of 3GB and push total batch size to 12GB.
|
|
static const size_t kKeyValueSize = 3221225472u;
|
|
std::string raw(kKeyValueSize, 'A');
|
|
WriteBatch batch(12884901888u + 1024u);
|
|
for (char i = 0; i < 2; i++) {
|
|
raw[0] = 'A' + i;
|
|
raw[raw.length() - 1] = 'A' - i;
|
|
batch.Put(raw, raw);
|
|
}
|
|
|
|
ASSERT_EQ(2, batch.Count());
|
|
|
|
struct NoopHandler : public WriteBatch::Handler {
|
|
int num_seen = 0;
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
EXPECT_EQ(kKeyValueSize, key.size());
|
|
EXPECT_EQ(kKeyValueSize, value.size());
|
|
EXPECT_EQ('A' + num_seen, key[0]);
|
|
EXPECT_EQ('A' + num_seen, value[0]);
|
|
EXPECT_EQ('A' - num_seen, key[kKeyValueSize - 1]);
|
|
EXPECT_EQ('A' - num_seen, value[kKeyValueSize - 1]);
|
|
++num_seen;
|
|
return Status::OK();
|
|
}
|
|
virtual Status DeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
EXPECT_TRUE(false);
|
|
return Status::OK();
|
|
}
|
|
virtual Status SingleDeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
EXPECT_TRUE(false);
|
|
return Status::OK();
|
|
}
|
|
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
EXPECT_TRUE(false);
|
|
return Status::OK();
|
|
}
|
|
virtual void LogData(const Slice& blob) override { EXPECT_TRUE(false); }
|
|
virtual bool Continue() override { return num_seen < 2; }
|
|
} handler;
|
|
|
|
batch.Iterate(&handler);
|
|
ASSERT_EQ(2, handler.num_seen);
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, Continue) {
|
|
WriteBatch batch;
|
|
|
|
struct Handler : public TestHandler {
|
|
int num_seen = 0;
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
++num_seen;
|
|
return TestHandler::PutCF(column_family_id, key, value);
|
|
}
|
|
virtual Status DeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
++num_seen;
|
|
return TestHandler::DeleteCF(column_family_id, key);
|
|
}
|
|
virtual Status SingleDeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
++num_seen;
|
|
return TestHandler::SingleDeleteCF(column_family_id, key);
|
|
}
|
|
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
++num_seen;
|
|
return TestHandler::MergeCF(column_family_id, key, value);
|
|
}
|
|
virtual void LogData(const Slice& blob) override {
|
|
++num_seen;
|
|
TestHandler::LogData(blob);
|
|
}
|
|
virtual bool Continue() override { return num_seen < 5; }
|
|
} handler;
|
|
|
|
batch.Put(Slice("k1"), Slice("v1"));
|
|
batch.Put(Slice("k2"), Slice("v2"));
|
|
batch.PutLogData(Slice("blob1"));
|
|
batch.Delete(Slice("k1"));
|
|
batch.SingleDelete(Slice("k2"));
|
|
batch.PutLogData(Slice("blob2"));
|
|
batch.Merge(Slice("foo"), Slice("bar"));
|
|
batch.Iterate(&handler);
|
|
ASSERT_EQ(
|
|
"Put(k1, v1)"
|
|
"Put(k2, v2)"
|
|
"LogData(blob1)"
|
|
"Delete(k1)"
|
|
"SingleDelete(k2)",
|
|
handler.seen);
|
|
}
|
|
|
|
TEST_F(WriteBatchTest, PutGatherSlices) {
|
|
WriteBatch batch;
|
|
batch.Put(Slice("foo"), Slice("bar"));
|
|
|
|
{
|
|
// Try a write where the key is one slice but the value is two
|
|
Slice key_slice("baz");
|
|
Slice value_slices[2] = { Slice("header"), Slice("payload") };
|
|
batch.Put(SliceParts(&key_slice, 1),
|
|
SliceParts(value_slices, 2));
|
|
}
|
|
|
|
{
|
|
// One where the key is composite but the value is a single slice
|
|
Slice key_slices[3] = { Slice("key"), Slice("part2"), Slice("part3") };
|
|
Slice value_slice("value");
|
|
batch.Put(SliceParts(key_slices, 3),
|
|
SliceParts(&value_slice, 1));
|
|
}
|
|
|
|
WriteBatchInternal::SetSequence(&batch, 100);
|
|
ASSERT_EQ("Put(baz, headerpayload)@101"
|
|
"Put(foo, bar)@100"
|
|
"Put(keypart2part3, value)@102",
|
|
PrintContents(&batch));
|
|
ASSERT_EQ(3, batch.Count());
|
|
}
|
|
|
|
namespace {
|
|
class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
|
|
public:
|
|
explicit ColumnFamilyHandleImplDummy(int id)
|
|
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
|
|
uint32_t GetID() const override { return id_; }
|
|
const Comparator* GetComparator() const override {
|
|
return BytewiseComparator();
|
|
}
|
|
|
|
private:
|
|
uint32_t id_;
|
|
};
|
|
} // namespace anonymous
|
|
|
|
TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
|
|
WriteBatch batch;
|
|
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
|
|
batch.Put(&zero, Slice("foo"), Slice("bar"));
|
|
batch.Put(&two, Slice("twofoo"), Slice("bar2"));
|
|
batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
|
|
batch.Delete(&eight, Slice("eightfoo"));
|
|
batch.SingleDelete(&two, Slice("twofoo"));
|
|
batch.DeleteRange(&two, Slice("3foo"), Slice("4foo"));
|
|
batch.Merge(&three, Slice("threethree"), Slice("3three"));
|
|
batch.Put(&zero, Slice("foo"), Slice("bar"));
|
|
batch.Merge(Slice("omom"), Slice("nom"));
|
|
|
|
TestHandler handler;
|
|
batch.Iterate(&handler);
|
|
ASSERT_EQ(
|
|
"Put(foo, bar)"
|
|
"PutCF(2, twofoo, bar2)"
|
|
"PutCF(8, eightfoo, bar8)"
|
|
"DeleteCF(8, eightfoo)"
|
|
"SingleDeleteCF(2, twofoo)"
|
|
"DeleteRangeCF(2, 3foo, 4foo)"
|
|
"MergeCF(3, threethree, 3three)"
|
|
"Put(foo, bar)"
|
|
"Merge(omom, nom)",
|
|
handler.seen);
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
|
|
WriteBatchWithIndex batch;
|
|
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
|
|
batch.Put(&zero, Slice("foo"), Slice("bar"));
|
|
batch.Put(&two, Slice("twofoo"), Slice("bar2"));
|
|
batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
|
|
batch.Delete(&eight, Slice("eightfoo"));
|
|
batch.SingleDelete(&two, Slice("twofoo"));
|
|
batch.DeleteRange(&two, Slice("twofoo"), Slice("threefoo"));
|
|
batch.Merge(&three, Slice("threethree"), Slice("3three"));
|
|
batch.Put(&zero, Slice("foo"), Slice("bar"));
|
|
batch.Merge(Slice("omom"), Slice("nom"));
|
|
|
|
std::unique_ptr<WBWIIterator> iter;
|
|
|
|
iter.reset(batch.NewIterator(&eight));
|
|
iter->Seek("eightfoo");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
|
|
ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
|
|
ASSERT_EQ("bar8", iter->Entry().value.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kDeleteRecord, iter->Entry().type);
|
|
ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(!iter->Valid());
|
|
|
|
iter.reset(batch.NewIterator(&two));
|
|
iter->Seek("twofoo");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
|
|
ASSERT_EQ("twofoo", iter->Entry().key.ToString());
|
|
ASSERT_EQ("bar2", iter->Entry().value.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kSingleDeleteRecord, iter->Entry().type);
|
|
ASSERT_EQ("twofoo", iter->Entry().key.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kDeleteRangeRecord, iter->Entry().type);
|
|
ASSERT_EQ("twofoo", iter->Entry().key.ToString());
|
|
ASSERT_EQ("threefoo", iter->Entry().value.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(!iter->Valid());
|
|
|
|
iter.reset(batch.NewIterator());
|
|
iter->Seek("gggg");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
|
|
ASSERT_EQ("omom", iter->Entry().key.ToString());
|
|
ASSERT_EQ("nom", iter->Entry().value.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(!iter->Valid());
|
|
|
|
iter.reset(batch.NewIterator(&zero));
|
|
iter->Seek("foo");
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
|
|
ASSERT_EQ("foo", iter->Entry().key.ToString());
|
|
ASSERT_EQ("bar", iter->Entry().value.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
|
|
ASSERT_EQ("foo", iter->Entry().key.ToString());
|
|
ASSERT_EQ("bar", iter->Entry().value.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(iter->Valid());
|
|
ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
|
|
ASSERT_EQ("omom", iter->Entry().key.ToString());
|
|
ASSERT_EQ("nom", iter->Entry().value.ToString());
|
|
|
|
iter->Next();
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_TRUE(!iter->Valid());
|
|
|
|
TestHandler handler;
|
|
batch.GetWriteBatch()->Iterate(&handler);
|
|
ASSERT_EQ(
|
|
"Put(foo, bar)"
|
|
"PutCF(2, twofoo, bar2)"
|
|
"PutCF(8, eightfoo, bar8)"
|
|
"DeleteCF(8, eightfoo)"
|
|
"SingleDeleteCF(2, twofoo)"
|
|
"DeleteRangeCF(2, twofoo, threefoo)"
|
|
"MergeCF(3, threethree, 3three)"
|
|
"Put(foo, bar)"
|
|
"Merge(omom, nom)",
|
|
handler.seen);
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
TEST_F(WriteBatchTest, SavePointTest) {
|
|
Status s;
|
|
WriteBatch batch;
|
|
batch.SetSavePoint();
|
|
|
|
batch.Put("A", "a");
|
|
batch.Put("B", "b");
|
|
batch.SetSavePoint();
|
|
|
|
batch.Put("C", "c");
|
|
batch.Delete("A");
|
|
batch.SetSavePoint();
|
|
batch.SetSavePoint();
|
|
|
|
ASSERT_OK(batch.RollbackToSavePoint());
|
|
ASSERT_EQ(
|
|
"Delete(A)@3"
|
|
"Put(A, a)@0"
|
|
"Put(B, b)@1"
|
|
"Put(C, c)@2",
|
|
PrintContents(&batch));
|
|
|
|
ASSERT_OK(batch.RollbackToSavePoint());
|
|
ASSERT_OK(batch.RollbackToSavePoint());
|
|
ASSERT_EQ(
|
|
"Put(A, a)@0"
|
|
"Put(B, b)@1",
|
|
PrintContents(&batch));
|
|
|
|
batch.Delete("A");
|
|
batch.Put("B", "bb");
|
|
|
|
ASSERT_OK(batch.RollbackToSavePoint());
|
|
ASSERT_EQ("", PrintContents(&batch));
|
|
|
|
s = batch.RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
ASSERT_EQ("", PrintContents(&batch));
|
|
|
|
batch.Put("D", "d");
|
|
batch.Delete("A");
|
|
|
|
batch.SetSavePoint();
|
|
|
|
batch.Put("A", "aaa");
|
|
|
|
ASSERT_OK(batch.RollbackToSavePoint());
|
|
ASSERT_EQ(
|
|
"Delete(A)@1"
|
|
"Put(D, d)@0",
|
|
PrintContents(&batch));
|
|
|
|
batch.SetSavePoint();
|
|
|
|
batch.Put("D", "d");
|
|
batch.Delete("A");
|
|
|
|
ASSERT_OK(batch.RollbackToSavePoint());
|
|
ASSERT_EQ(
|
|
"Delete(A)@1"
|
|
"Put(D, d)@0",
|
|
PrintContents(&batch));
|
|
|
|
s = batch.RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
ASSERT_EQ(
|
|
"Delete(A)@1"
|
|
"Put(D, d)@0",
|
|
PrintContents(&batch));
|
|
|
|
WriteBatch batch2;
|
|
|
|
s = batch2.RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
ASSERT_EQ("", PrintContents(&batch2));
|
|
|
|
batch2.Delete("A");
|
|
batch2.SetSavePoint();
|
|
|
|
s = batch2.RollbackToSavePoint();
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("Delete(A)@0", PrintContents(&batch2));
|
|
|
|
batch2.Clear();
|
|
ASSERT_EQ("", PrintContents(&batch2));
|
|
|
|
batch2.SetSavePoint();
|
|
|
|
batch2.Delete("B");
|
|
ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
|
|
|
|
batch2.SetSavePoint();
|
|
s = batch2.RollbackToSavePoint();
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
|
|
|
|
s = batch2.RollbackToSavePoint();
|
|
ASSERT_OK(s);
|
|
ASSERT_EQ("", PrintContents(&batch2));
|
|
|
|
s = batch2.RollbackToSavePoint();
|
|
ASSERT_TRUE(s.IsNotFound());
|
|
ASSERT_EQ("", PrintContents(&batch2));
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|