rocksdb/db/db_test2.cc
Islam AbdelRahman 68a8e6b8fa Introduce FullMergeV2 (eliminate memcpy from merge operators)
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>

This diff is stacked on top of D56493 and D56511

In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand

```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000

[FullMergeV2]
readseq      :       0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq      :       0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq      :       0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq      :       0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq      :       0.247 micros/op 4043927 ops/sec; 39553.2 MB/s

[master]
readseq      :       3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq      :       3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq      :       3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq      :       3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq      :       4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```

```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000

[FullMergeV2]
readseq      :       3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq      :       2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq      :       1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq      :       1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq      :       1.250 micros/op 800000 ops/sec; 7824.7 MB/s

[master]
readseq      :      24.025 micros/op 41623 ops/sec;  407.1 MB/s
readseq      :      18.489 micros/op 54086 ops/sec;  529.0 MB/s
readseq      :      18.693 micros/op 53495 ops/sec;  523.2 MB/s
readseq      :      23.621 micros/op 42335 ops/sec;  414.1 MB/s
readseq      :      18.775 micros/op 53262 ops/sec;  521.0 MB/s

```

```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]

[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq      :      14.741 micros/op 67837 ops/sec;  663.5 MB/s
readseq      :       1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq      :       0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq      :       0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq      :       0.943 micros/op 1060657 ops/sec; 10374.2 MB/s

[master]
readseq      :      16.735 micros/op 59755 ops/sec;  584.5 MB/s
readseq      :       3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq      :       3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq      :       3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq      :       3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```

```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions

[FullMergeV2]
readseq      :      24.325 micros/op 41109 ops/sec;  402.1 MB/s
readseq      :       1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq      :       1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq      :       1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq      :       1.109 micros/op 901713 ops/sec; 8819.6 MB/s

[master]
readseq      :      27.257 micros/op 36687 ops/sec;  358.8 MB/s
readseq      :       4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq      :       5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq      :       4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq      :       4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```

Test Plan: COMPILE_WITH_ASAN=1 make check -j64

Reviewers: yhchiang, andrewkr, sdong

Reviewed By: sdong

Subscribers: lovro, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D57075
2016-07-20 09:49:03 -07:00

1758 lines
61 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic>
#include <cstdlib>
#include <functional>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/wal_filter.h"
namespace rocksdb {
class DBTest2 : public DBTestBase {
public:
DBTest2() : DBTestBase("/db_test2") {}
};
class PrefixFullBloomWithReverseComparator
: public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefixFullBloomWithReverseComparator()
: DBTestBase("/prefix_bloom_reverse") {}
virtual void SetUp() override { if_cache_filter_ = GetParam(); }
bool if_cache_filter_;
};
TEST_P(PrefixFullBloomWithReverseComparator,
PrefixFullBloomWithReverseComparator) {
Options options = last_options_;
options.comparator = ReverseBytewiseComparator();
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions bbto;
if (if_cache_filter_) {
bbto.no_block_cache = false;
bbto.cache_index_and_filter_blocks = true;
bbto.block_cache = NewLRUCache(1);
}
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
dbfull()->Flush(FlushOptions());
if (bbto.block_cache) {
bbto.block_cache->EraseUnRefEntries();
}
unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->Seek("bar345");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar234", iter->key().ToString());
ASSERT_EQ("foo2", iter->value().ToString());
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar123", iter->key().ToString());
ASSERT_EQ("foo", iter->value().ToString());
iter->Seek("foo234");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo123", iter->key().ToString());
ASSERT_EQ("foo3", iter->value().ToString());
iter->Seek("bar");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
PrefixFullBloomWithReverseComparator, testing::Bool());
TEST_F(DBTest2, IteratorPropertyVersionNumber) {
Put("", "");
Iterator* iter1 = db_->NewIterator(ReadOptions());
std::string prop_value;
ASSERT_OK(
iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number1 =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
Put("", "");
Flush();
Iterator* iter2 = db_->NewIterator(ReadOptions());
ASSERT_OK(
iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number2 =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
ASSERT_GT(version_number2, version_number1);
Put("", "");
Iterator* iter3 = db_->NewIterator(ReadOptions());
ASSERT_OK(
iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number3 =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
ASSERT_EQ(version_number2, version_number3);
iter1->SeekToFirst();
ASSERT_OK(
iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
uint64_t version_number1_new =
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
ASSERT_EQ(version_number1, version_number1_new);
delete iter1;
delete iter2;
delete iter3;
}
TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "a", "begin");
Put(1, "z", "end");
ASSERT_OK(Flush(1));
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
std::string value;
value = Get(1, "a");
}
#ifndef ROCKSDB_LITE
class DBTestSharedWriteBufferAcrossCFs
: public DBTestBase,
public testing::WithParamInterface<bool> {
public:
DBTestSharedWriteBufferAcrossCFs()
: DBTestBase("/db_test_shared_write_buffer") {}
void SetUp() override { use_old_interface_ = GetParam(); }
bool use_old_interface_;
};
TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
Options options = CurrentOptions();
if (use_old_interface_) {
options.db_write_buffer_size = 100000; // this is the real limit
} else {
options.write_buffer_manager.reset(new WriteBufferManager(100000));
}
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
// Trigger a flush on CF "nikitich"
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
ASSERT_OK(Put(2, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// "dobrynia": 20KB
// Flush 'dobrynia'
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(70000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// "nikitich" still has data of 80KB
// Inserting Data in "dobrynia" triggers "nikitich" flushing.
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "dobrynia" still has 40KB
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
ASSERT_OK(Put(0, Key(1), DummyString(10000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers no flush
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB
ASSERT_OK(Put(1, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers flush of "pikachu"
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "dobrynia": 40KB
// Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on
// closure.
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(3));
}
}
INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
DBTestSharedWriteBufferAcrossCFs, ::testing::Bool());
TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
std::string dbname2 = test::TmpDir(env_) + "/db_shared_wb_db2";
Options options = CurrentOptions();
options.write_buffer_size = 500000; // this is never hit
options.write_buffer_manager.reset(new WriteBufferManager(100000));
CreateAndReopenWithCF({"cf1", "cf2"}, options);
ASSERT_OK(DestroyDB(dbname2, options));
DB* db2 = nullptr;
ASSERT_OK(DB::Open(options, dbname2, &db2));
WriteOptions wo;
// Trigger a flush on cf2
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(2, Key(1), DummyString(90000)));
// Insert to DB2
ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(0));
}
// db2: 20KB
ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000)));
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(1));
}
//
// Inserting Data in db2 and db_ triggers flushing in db_.
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
ASSERT_OK(Put(2, Key(2), DummyString(45000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(1));
}
delete db2;
ASSERT_OK(DestroyDB(dbname2, options));
}
namespace {
void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
const std::vector<Slice>& keys_must_not_exist) {
// Ensure that expected keys exist
std::vector<std::string> values;
if (keys_must_exist.size() > 0) {
std::vector<Status> status_list =
db->MultiGet(ReadOptions(), keys_must_exist, &values);
for (size_t i = 0; i < keys_must_exist.size(); i++) {
ASSERT_OK(status_list[i]);
}
}
// Ensure that given keys don't exist
if (keys_must_not_exist.size() > 0) {
std::vector<Status> status_list =
db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
ASSERT_TRUE(status_list[i].IsNotFound());
}
}
}
} // namespace
TEST_F(DBTest2, WalFilterTest) {
class TestWalFilter : public WalFilter {
private:
// Processing option that is requested to be applied at the given index
WalFilter::WalProcessingOption wal_processing_option_;
// Index at which to apply wal_processing_option_
// At other indexes default wal_processing_option::kContinueProcessing is
// returned.
size_t apply_option_at_record_index_;
// Current record index, incremented with each record encountered.
size_t current_record_index_;
public:
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
size_t apply_option_for_record_index)
: wal_processing_option_(wal_processing_option),
apply_option_at_record_index_(apply_option_for_record_index),
current_record_index_(0) {}
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) const override {
WalFilter::WalProcessingOption option_to_return;
if (current_record_index_ == apply_option_at_record_index_) {
option_to_return = wal_processing_option_;
}
else {
option_to_return = WalProcessingOption::kContinueProcessing;
}
// Filter is passed as a const object for RocksDB to not modify the
// object, however we modify it for our own purpose here and hence
// cast the constness away.
(const_cast<TestWalFilter*>(this)->current_record_index_)++;
return option_to_return;
}
virtual const char* Name() const override { return "TestWalFilter"; }
};
// Create 3 batches with two keys each
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
// Test with all WAL processing options
for (int option = 0;
option < static_cast<int>(
WalFilter::WalProcessingOption::kWalProcessingOptionMax);
option++) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
WalFilter::WalProcessingOption wal_processing_option =
static_cast<WalFilter::WalProcessingOption>(option);
// Create a test filter that would apply wal_processing_option at the first
// record
size_t apply_option_for_record_index = 1;
TestWalFilter test_wal_filter(wal_processing_option,
apply_option_for_record_index);
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter;
Status status =
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
if (wal_processing_option ==
WalFilter::WalProcessingOption::kCorruptedRecord) {
assert(!status.ok());
// In case of corruption we can turn off paranoid_checks to reopen
// databse
options.paranoid_checks = false;
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
}
else {
assert(status.ok());
}
// Compute which keys we expect to be found
// and which we expect not to be found after recovery.
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kCorruptedRecord:
case WalFilter::WalProcessingOption::kContinueProcessing: {
fprintf(stderr, "Testing with complete WAL processing\n");
// we expect all records to be processed
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
break;
}
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
fprintf(stderr,
"Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
apply_option_for_record_index);
// We expect the record with apply_option_for_record_index to be not
// found.
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i == apply_option_for_record_index) {
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
}
else {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
}
break;
}
case WalFilter::WalProcessingOption::kStopReplay: {
fprintf(stderr,
"Testing with stopping replay from record %" ROCKSDB_PRIszt
"\n",
apply_option_for_record_index);
// We expect records beyond apply_option_for_record_index to be not
// found.
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i >= apply_option_for_record_index) {
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
}
else {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
}
break;
}
default:
assert(false); // unhandled case
}
bool checked_after_reopen = false;
while (true) {
// Ensure that expected keys exists
// and not expected keys don't exist after recovery
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
if (checked_after_reopen) {
break;
}
// reopen database again to make sure previous log(s) are not used
//(even if they were skipped)
// reopn database with option to use WAL filter
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
checked_after_reopen = true;
}
}
}
TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
class ChangeBatchHandler : public WriteBatch::Handler {
private:
// Batch to insert keys in
WriteBatch* new_write_batch_;
// Number of keys to add in the new batch
size_t num_keys_to_add_in_new_batch_;
// Number of keys added to new batch
size_t num_keys_added_;
public:
ChangeBatchHandler(WriteBatch* new_write_batch,
size_t num_keys_to_add_in_new_batch)
: new_write_batch_(new_write_batch),
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
num_keys_added_(0) {}
virtual void Put(const Slice& key, const Slice& value) override {
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
new_write_batch_->Put(key, value);
++num_keys_added_;
}
}
};
class TestWalFilterWithChangeBatch : public WalFilter {
private:
// Index at which to start changing records
size_t change_records_from_index_;
// Number of keys to add in the new batch
size_t num_keys_to_add_in_new_batch_;
// Current record index, incremented with each record encountered.
size_t current_record_index_;
public:
TestWalFilterWithChangeBatch(size_t change_records_from_index,
size_t num_keys_to_add_in_new_batch)
: change_records_from_index_(change_records_from_index),
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
current_record_index_(0) {}
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) const override {
if (current_record_index_ >= change_records_from_index_) {
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
batch.Iterate(&handler);
*batch_changed = true;
}
// Filter is passed as a const object for RocksDB to not modify the
// object, however we modify it for our own purpose here and hence
// cast the constness away.
(const_cast<TestWalFilterWithChangeBatch*>(this)
->current_record_index_)++;
return WalProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "TestWalFilterWithChangeBatch";
}
};
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
// Create a test filter that would apply wal_processing_option at the first
// record
size_t change_records_from_index = 1;
size_t num_keys_to_add_in_new_batch = 1;
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
change_records_from_index, num_keys_to_add_in_new_batch);
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_with_change_batch;
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
// Ensure that all keys exist before change_records_from_index_
// And after that index only single key exists
// as our filter adds only single key for each batch
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
}
else {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
}
bool checked_after_reopen = false;
while (true) {
// Ensure that expected keys exists
// and not expected keys don't exist after recovery
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
if (checked_after_reopen) {
break;
}
// reopen database again to make sure previous log(s) are not used
//(even if they were skipped)
// reopn database with option to use WAL filter
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
checked_after_reopen = true;
}
}
TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
public:
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) const override {
*new_batch = batch;
new_batch->Put("key_extra", "value_extra");
*batch_changed = true;
return WalProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "WalFilterTestWithChangeBatchExtraKeys";
}
};
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
// Create a test filter that would add extra keys
TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_extra_keys;
Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_TRUE(status.IsNotSupported());
// Reopen without filter, now reopen should succeed - previous
// attempt to open must not have altered the db.
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist; // empty vector
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
}
TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
class TestWalFilterWithColumnFamilies : public WalFilter {
private:
// column_family_id -> log_number map (provided to WALFilter)
std::map<uint32_t, uint64_t> cf_log_number_map_;
// column_family_name -> column_family_id map (provided to WALFilter)
std::map<std::string, uint32_t> cf_name_id_map_;
// column_family_name -> keys_found_in_wal map
// We store keys that are applicable to the column_family
// during recovery (i.e. aren't already flushed to SST file(s))
// for verification against the keys we expect.
std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
public:
virtual void ColumnFamilyLogNumberMap(
const std::map<uint32_t, uint64_t>& cf_lognumber_map,
const std::map<std::string, uint32_t>& cf_name_id_map) override {
cf_log_number_map_ = cf_lognumber_map;
cf_name_id_map_ = cf_name_id_map;
}
virtual WalProcessingOption LogRecordFound(unsigned long long log_number,
const std::string& log_file_name,
const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) override {
class LogRecordBatchHandler : public WriteBatch::Handler {
private:
const std::map<uint32_t, uint64_t> & cf_log_number_map_;
std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
unsigned long long log_number_;
public:
LogRecordBatchHandler(unsigned long long current_log_number,
const std::map<uint32_t, uint64_t> & cf_log_number_map,
std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
cf_log_number_map_(cf_log_number_map),
cf_wal_keys_(cf_wal_keys),
log_number_(current_log_number){}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& /*value*/) override {
auto it = cf_log_number_map_.find(column_family_id);
assert(it != cf_log_number_map_.end());
unsigned long long log_number_for_cf = it->second;
// If the current record is applicable for column_family_id
// (i.e. isn't flushed to SST file(s) for column_family_id)
// add it to the cf_wal_keys_ map for verification.
if (log_number_ >= log_number_for_cf) {
cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
key.size()));
}
return Status::OK();
}
} handler(log_number, cf_log_number_map_, cf_wal_keys_);
batch.Iterate(&handler);
return WalProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "WalFilterTestWithColumnFamilies";
}
const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
return cf_wal_keys_;
}
const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
return cf_name_id_map_;
}
};
std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
batch_keys_pre_flush[0].push_back("key1");
batch_keys_pre_flush[0].push_back("key2");
batch_keys_pre_flush[1].push_back("key3");
batch_keys_pre_flush[1].push_back("key4");
batch_keys_pre_flush[2].push_back("key5");
batch_keys_pre_flush[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
//Flush default column-family
db_->Flush(FlushOptions(), handles_[0]);
// Do some more writes
std::vector<std::vector<std::string>> batch_keys_post_flush(3);
batch_keys_post_flush[0].push_back("key7");
batch_keys_post_flush[0].push_back("key8");
batch_keys_post_flush[1].push_back("key9");
batch_keys_post_flush[1].push_back("key10");
batch_keys_post_flush[2].push_back("key11");
batch_keys_post_flush[2].push_back("key12");
// Write given keys in given batches
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
// On Recovery we should only find the second batch applicable to default CF
// But both batches applicable to pikachu CF
// Create a test filter that would add extra keys
TestWalFilterWithColumnFamilies test_wal_filter_column_families;
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_column_families;
Status status =
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
ASSERT_TRUE(status.ok());
// verify that handles_[0] only has post_flush keys
// while handles_[1] has pre and post flush keys
auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
size_t index = 0;
auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
//default column-family, only post_flush keys are expected
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
}
}
ASSERT_TRUE(index == keys_cf.size());
index = 0;
keys_cf = cf_wal_keys[name_id_map["pikachu"]];
//pikachu column-family, all keys are expected
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_pre_flush[i][j]);
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
}
}
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
}
}
ASSERT_TRUE(index == keys_cf.size());
}
TEST_F(DBTest2, PresetCompressionDict) {
const size_t kBlockSizeBytes = 4 << 10;
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;
Options options;
options.arena_block_size = kBlockSizeBytes;
options.compaction_style = kCompactionStyleUniversal;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.memtable_factory.reset(
new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
options.num_levels = 2;
options.target_file_size_base = kL0FileBytes;
options.target_file_size_multiplier = 2;
options.write_buffer_size = kL0FileBytes;
BlockBasedTableOptions table_options;
table_options.block_size = kBlockSizeBytes;
std::vector<CompressionType> compression_types;
if (Zlib_Supported()) {
compression_types.push_back(kZlibCompression);
}
#if LZ4_VERSION_NUMBER >= 10400 // r124+
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
#endif // LZ4_VERSION_NUMBER >= 10400
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
compression_types.push_back(kZSTDNotFinalCompression);
#endif // ZSTD_VERSION_NUMBER >= 500
for (auto compression_type : compression_types) {
options.compression = compression_type;
size_t prev_out_bytes;
for (int i = 0; i < 2; ++i) {
// First iteration: compress without preset dictionary
// Second iteration: compress with preset dictionary
// To make sure the compression dictionary was actually used, we verify
// the compressed size is smaller in the second iteration. Also in the
// second iteration, verify the data we get out is the same data we put
// in.
if (i) {
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
} else {
options.compression_opts.max_dict_bytes = 0;
}
options.statistics = rocksdb::CreateDBStatistics();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
std::string seq_data =
RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
for (int j = 0; j < kNumL0Files; ++j) {
for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
ASSERT_OK(Put(1, Key(static_cast<int>(
j * (kL0FileBytes / kBlockSizeBytes) + k)),
seq_data));
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
}
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
size_t out_bytes = 0;
std::vector<std::string> files;
GetSstFiles(dbname_, &files);
for (const auto& file : files) {
uint64_t curr_bytes;
env_->GetFileSize(dbname_ + "/" + file, &curr_bytes);
out_bytes += static_cast<size_t>(curr_bytes);
}
for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
j++) {
ASSERT_EQ(seq_data, Get(1, Key(static_cast<int>(j))));
}
if (i) {
ASSERT_GT(prev_out_bytes, out_bytes);
}
prev_out_bytes = out_bytes;
DestroyAndReopen(options);
}
}
}
class CompactionCompressionListener : public EventListener {
public:
explicit CompactionCompressionListener(Options* db_options)
: db_options_(db_options) {}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
// Figure out last level with files
int bottommost_level = 0;
for (int level = 0; level < db->NumberLevels(); level++) {
std::string files_at_level;
ASSERT_TRUE(
db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
&files_at_level));
if (files_at_level != "0") {
bottommost_level = level;
}
}
if (db_options_->bottommost_compression != kDisableCompressionOption &&
ci.output_level == bottommost_level && ci.output_level >= 2) {
ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
} else if (db_options_->compression_per_level.size() != 0) {
ASSERT_EQ(ci.compression,
db_options_->compression_per_level[ci.output_level]);
} else {
ASSERT_EQ(ci.compression, db_options_->compression);
}
max_level_checked = std::max(max_level_checked, ci.output_level);
}
int max_level_checked = 0;
const Options* db_options_;
};
TEST_F(DBTest2, CompressionOptions) {
if (!Zlib_Supported() || !Snappy_Supported()) {
return;
}
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 100;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
options.base_background_compactions = 1;
CompactionCompressionListener* listener =
new CompactionCompressionListener(&options);
options.listeners.emplace_back(listener);
const int kKeySize = 5;
const int kValSize = 20;
Random rnd(301);
for (int iter = 0; iter <= 2; iter++) {
listener->max_level_checked = 0;
if (iter == 0) {
// Use different compression algorithms for different levels but
// always use Zlib for bottommost level
options.compression_per_level = {kNoCompression, kNoCompression,
kNoCompression, kSnappyCompression,
kSnappyCompression, kSnappyCompression,
kZlibCompression};
options.compression = kNoCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 1) {
// Use Snappy except for bottommost level use ZLib
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 2) {
// Use Snappy everywhere
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kDisableCompressionOption;
}
DestroyAndReopen(options);
// Write 10 random files
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
ASSERT_OK(
Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize)));
}
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact();
}
// Make sure that we wrote enough to check all 7 levels
ASSERT_EQ(listener->max_level_checked, 6);
}
}
class CompactionStallTestListener : public EventListener {
public:
CompactionStallTestListener() : compacted_files_cnt_(0) {}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
compacted_files_cnt_ += ci.input_files.size();
}
std::atomic<size_t> compacted_files_cnt_;
};
TEST_F(DBTest2, CompactionStall) {
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
{"DBTest2::CompactionStall:2",
"DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_background_compactions = 40;
CompactionStallTestListener* listener = new CompactionStallTestListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
Random rnd(301);
// 4 Files in L0
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
}
ASSERT_OK(Flush());
}
// Wait for compaction to be triggered
TEST_SYNC_POINT("DBTest2::CompactionStall:0");
// Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again
// at DBTest2::CompactionStall::1
rocksdb::SyncPoint::GetInstance()->ClearTrace();
// Another 6 L0 files to trigger compaction again
for (int i = 0; i < 6; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
}
ASSERT_OK(Flush());
}
// Wait for another compaction to be triggered
TEST_SYNC_POINT("DBTest2::CompactionStall:1");
// Hold NotifyOnCompactionCompleted in the unlock mutex section
TEST_SYNC_POINT("DBTest2::CompactionStall:2");
dbfull()->TEST_WaitForCompact();
ASSERT_LT(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger);
ASSERT_GT(listener->compacted_files_cnt_.load(),
10 - options.level0_file_num_compaction_trigger);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#endif // ROCKSDB_LITE
TEST_F(DBTest2, FirstSnapshotTest) {
Options options;
options.write_buffer_size = 100000; // Small write buffer
options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options);
// This snapshot will have sequence number 0 what is expected behaviour.
const Snapshot* s1 = db_->GetSnapshot();
Put(1, "k1", std::string(100000, 'x')); // Fill memtable
Put(1, "k2", std::string(100000, 'y')); // Trigger flush
db_->ReleaseSnapshot(s1);
}
class PinL0IndexAndFilterBlocksTest : public DBTestBase,
public testing::WithParamInterface<bool> {
public:
PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
virtual void SetUp() override { infinite_max_files_ = GetParam(); }
bool infinite_max_files_;
};
TEST_P(PinL0IndexAndFilterBlocksTest,
IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
Options options = CurrentOptions();
if (infinite_max_files_) {
options.max_open_files = -1;
}
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "key", "val"));
// Create a new table.
ASSERT_OK(Flush(1));
// index/filter blocks added to block cache right after table creation.
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// only index/filter were added
ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
std::string value;
// Miss and hit count should remain the same, they're all pinned.
db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// Miss and hit count should remain the same, they're all pinned.
value = Get(1, "key");
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
TEST_P(PinL0IndexAndFilterBlocksTest,
MultiLevelIndexAndFilterBlocksCachedWithPinning) {
Options options = CurrentOptions();
if (infinite_max_files_) {
options.max_open_files = -1;
}
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "a", "begin");
Put(1, "z", "end");
ASSERT_OK(Flush(1));
// move this table to L1
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
// reset block cache
table_options.block_cache = NewLRUCache(64 * 1024);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
// create new table at L0
Put(1, "a2", "begin2");
Put(1, "z2", "end2");
ASSERT_OK(Flush(1));
table_options.block_cache->EraseUnRefEntries();
// get base cache values
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
std::string value;
// this should be read from L0
// so cache values don't change
value = Get(1, "a2");
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// this should be read from L1
// the file is opened, prefetching results in a cache filter miss
// the block is loaded and added to the cache,
// then the get results in a cache hit for L1
value = Get(1, "a");
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
}
INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
PinL0IndexAndFilterBlocksTest, ::testing::Bool());
#ifndef ROCKSDB_LITE
static void UniqueIdCallback(void* arg) {
int* result = reinterpret_cast<int*>(arg);
if (*result == -1) {
*result = 0;
}
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
class MockPersistentCache : public PersistentCache {
public:
explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
: is_compressed_(is_compressed), max_size_(max_size) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
virtual ~MockPersistentCache() {}
Status Insert(const Slice& page_key, const char* data,
const size_t size) override {
MutexLock _(&lock_);
if (size_ > max_size_) {
size_ -= data_.begin()->second.size();
data_.erase(data_.begin());
}
data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
size_ += size;
return Status::OK();
}
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
size_t* size) override {
MutexLock _(&lock_);
auto it = data_.find(page_key.ToString());
if (it == data_.end()) {
return Status::NotFound();
}
assert(page_key.ToString() == it->first);
data->reset(new char[it->second.size()]);
memcpy(data->get(), it->second.c_str(), it->second.size());
*size = it->second.size();
return Status::OK();
}
bool IsCompressed() override { return is_compressed_; }
port::Mutex lock_;
std::map<std::string, std::string> data_;
const bool is_compressed_ = true;
size_t size_ = 0;
const size_t max_size_ = 10 * 1024; // 10KiB
};
TEST_F(DBTest2, PersistentCache) {
int num_iter = 80;
Options options;
options.write_buffer_size = 64 * 1024; // small write buffer
options.statistics = rocksdb::CreateDBStatistics();
options = CurrentOptions(options);
auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024};
auto types = {/*compressed*/ 1, /*uncompressed*/ 0};
for (auto bsize : bsizes) {
for (auto type : types) {
BlockBasedTableOptions table_options;
table_options.persistent_cache.reset(
new MockPersistentCache(type, 10 * 1024));
table_options.no_block_cache = true;
table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// default column family doesn't have block cache
Options no_block_cache_opts;
no_block_cache_opts.statistics = options.statistics;
no_block_cache_opts = CurrentOptions(no_block_cache_opts);
BlockBasedTableOptions table_options_no_bc;
table_options_no_bc.no_block_cache = true;
no_block_cache_opts.table_factory.reset(
NewBlockBasedTableFactory(table_options_no_bc));
ReopenWithColumnFamilies(
{"default", "pikachu"},
std::vector<Options>({no_block_cache_opts, options}));
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
std::vector<std::string> values;
std::string str;
for (int i = 0; i < num_iter; i++) {
if (i % 4 == 0) { // high compression ratio
str = RandomString(&rnd, 1000);
}
values.push_back(str);
ASSERT_OK(Put(1, Key(i), values[i]));
}
// flush all data from memtable so that reads are from block cache
ASSERT_OK(Flush(1));
for (int i = 0; i < num_iter; i++) {
ASSERT_EQ(Get(1, Key(i)), values[i]);
}
auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
ASSERT_GT(hit, 0);
ASSERT_GT(miss, 0);
}
}
}
namespace {
void CountSyncPoint() {
TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
}
} // namespace
TEST_F(DBTest2, SyncPointMarker) {
std::atomic<int> sync_point_called(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBTest2::MarkedPoint",
[&](void* arg) { sync_point_called.fetch_add(1); });
// The first dependency enforces Marker can be loaded before MarkedPoint.
// The second checks that thread 1's MarkedPoint should be disabled here.
// Execution order:
// | Thread 1 | Thread 2 |
// | | Marker |
// | MarkedPoint | |
// | Thread1First | |
// | | MarkedPoint |
rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
{{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
{{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::function<void()> func1 = [&]() {
CountSyncPoint();
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
};
std::function<void()> func2 = [&]() {
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
CountSyncPoint();
};
auto thread1 = std::thread(func1);
auto thread2 = std::thread(func2);
thread1.join();
thread2.join();
// Callback is only executed once
ASSERT_EQ(sync_point_called.load(), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#endif
class MergeOperatorPinningTest : public DBTest2,
public testing::WithParamInterface<bool> {
public:
MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
bool disable_block_cache_;
};
INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
::testing::Bool());
#ifndef ROCKSDB_LITE
TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
Options options = CurrentOptions();
BlockBasedTableOptions table_options;
table_options.block_size = 1; // every block will contain one entry
table_options.no_block_cache = disable_block_cache_;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
options.level0_slowdown_writes_trigger = (1 << 30);
options.level0_stop_writes_trigger = (1 << 30);
options.disable_auto_compactions = true;
DestroyAndReopen(options);
const int kKeysPerFile = 10;
const int kOperandsPerKeyPerFile = 7;
const int kOperandSize = 100;
// Filse to write in L0 before compacting to lower level
const int kFilesPerLevel = 3;
Random rnd(301);
std::map<std::string, std::string> true_data;
int batch_num = 1;
int lvl_to_fill = 4;
int key_id = 0;
while (true) {
for (int j = 0; j < kKeysPerFile; j++) {
std::string key = Key(key_id % 35);
key_id++;
for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
std::string val = RandomString(&rnd, kOperandSize);
ASSERT_OK(db_->Merge(WriteOptions(), key, val));
if (true_data[key].size() == 0) {
true_data[key] = val;
} else {
true_data[key] += "," + val;
}
}
}
if (lvl_to_fill == -1) {
// Keep last batch in memtable and stop
break;
}
ASSERT_OK(Flush());
if (batch_num % kFilesPerLevel == 0) {
if (lvl_to_fill != 0) {
MoveFilesToLevel(lvl_to_fill);
}
lvl_to_fill--;
}
batch_num++;
}
// 3 L0 files
// 1 L1 file
// 3 L2 files
// 1 L3 file
// 3 L4 Files
ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
// Verify Get()
for (auto kv : true_data) {
ASSERT_EQ(Get(kv.first), kv.second);
}
Iterator* iter = db_->NewIterator(ReadOptions());
// Verify Iterator::Next()
auto data_iter = true_data.begin();
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
}
ASSERT_EQ(data_iter, true_data.end());
// Verify Iterator::Prev()
auto data_rev = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
ASSERT_EQ(iter->key().ToString(), data_rev->first);
ASSERT_EQ(iter->value().ToString(), data_rev->second);
}
ASSERT_EQ(data_rev, true_data.rend());
// Verify Iterator::Seek()
for (auto kv : true_data) {
iter->Seek(kv.first);
ASSERT_EQ(kv.first, iter->key().ToString());
ASSERT_EQ(kv.second, iter->value().ToString());
}
delete iter;
}
TEST_P(MergeOperatorPinningTest, Randomized) {
do {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateMaxOperator();
BlockBasedTableOptions table_options;
table_options.no_block_cache = disable_block_cache_;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
Random rnd(301);
std::map<std::string, std::string> true_data;
const int kTotalMerges = 10000;
// Every key gets ~10 operands
const int kKeyRange = kTotalMerges / 10;
const int kOperandSize = 20;
const int kNumPutBefore = kKeyRange / 10; // 10% value
const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
const int kNumDelete = kKeyRange / 10; // 10% delete
// kNumPutBefore keys will have base values
for (int i = 0; i < kNumPutBefore; i++) {
std::string key = Key(rnd.Next() % kKeyRange);
std::string value = RandomString(&rnd, kOperandSize);
ASSERT_OK(db_->Put(WriteOptions(), key, value));
true_data[key] = value;
}
// Do kTotalMerges merges
for (int i = 0; i < kTotalMerges; i++) {
std::string key = Key(rnd.Next() % kKeyRange);
std::string value = RandomString(&rnd, kOperandSize);
ASSERT_OK(db_->Merge(WriteOptions(), key, value));
if (true_data[key] < value) {
true_data[key] = value;
}
}
// Overwrite random kNumPutAfter keys
for (int i = 0; i < kNumPutAfter; i++) {
std::string key = Key(rnd.Next() % kKeyRange);
std::string value = RandomString(&rnd, kOperandSize);
ASSERT_OK(db_->Put(WriteOptions(), key, value));
true_data[key] = value;
}
// Delete random kNumDelete keys
for (int i = 0; i < kNumDelete; i++) {
std::string key = Key(rnd.Next() % kKeyRange);
ASSERT_OK(db_->Delete(WriteOptions(), key));
true_data.erase(key);
}
VerifyDBFromMap(true_data);
// Skip HashCuckoo since it does not support merge operators
} while (ChangeOptions(kSkipMergePut | kSkipHashCuckoo));
}
class MergeOperatorHook : public MergeOperator {
public:
explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
: merge_op_(_merge_op) {}
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
before_merge_();
bool res = merge_op_->FullMergeV2(merge_in, merge_out);
after_merge_();
return res;
}
virtual const char* Name() const override { return merge_op_->Name(); }
std::shared_ptr<MergeOperator> merge_op_;
std::function<void()> before_merge_ = []() {};
std::function<void()> after_merge_ = []() {};
};
TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
Options options = CurrentOptions();
auto merge_hook =
std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
options.merge_operator = merge_hook;
options.disable_auto_compactions = true;
options.level0_slowdown_writes_trigger = (1 << 30);
options.level0_stop_writes_trigger = (1 << 30);
options.max_open_files = 20;
BlockBasedTableOptions bbto;
bbto.no_block_cache = disable_block_cache_;
if (bbto.no_block_cache == false) {
bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
} else {
bbto.block_cache = nullptr;
}
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
const int kNumOperands = 30;
const int kNumKeys = 1000;
const int kOperandSize = 100;
Random rnd(301);
// 1000 keys every key have 30 operands, every operand is in a different file
std::map<std::string, std::string> true_data;
for (int i = 0; i < kNumOperands; i++) {
for (int j = 0; j < kNumKeys; j++) {
std::string k = Key(j);
std::string v = RandomString(&rnd, kOperandSize);
ASSERT_OK(db_->Merge(WriteOptions(), k, v));
true_data[k] = std::max(true_data[k], v);
}
ASSERT_OK(Flush());
}
std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
ASSERT_EQ(file_numbers.size(), kNumOperands);
int merge_cnt = 0;
// Code executed before merge operation
merge_hook->before_merge_ = [&]() {
// Evict all tables from cache before every merge operation
for (uint64_t num : file_numbers) {
TableCache::Evict(dbfull()->TEST_table_cache(), num);
}
// Decrease cache capacity to force all unrefed blocks to be evicted
if (bbto.block_cache) {
bbto.block_cache->SetCapacity(1);
}
merge_cnt++;
};
// Code executed after merge operation
merge_hook->after_merge_ = [&]() {
// Increase capacity again after doing the merge
if (bbto.block_cache) {
bbto.block_cache->SetCapacity(64 * 1024 * 1024);
}
};
VerifyDBFromMap(true_data);
ASSERT_EQ(merge_cnt, kNumKeys * 4 /* get + next + prev + seek */);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
VerifyDBFromMap(true_data);
}
#endif // ROCKSDB_LITE
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}