mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 07:30:54 +00:00
68a8e6b8fa
Summary: This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice> This diff is stacked on top of D56493 and D56511 In this diff we - Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future - Replace std::deque<std::string> with std::vector<Slice> to pass operands - Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187) - Allow FullMergeV2 output to be an existing operand ``` [Everything in Memtable | 10K operands | 10 KB each | 1 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s [master] readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s ``` ``` [Everything in Memtable | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000 [FullMergeV2] readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s [master] readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 1 operand per key] [FullMergeV2] $ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s [master] readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s ``` ``` [Everything in Block cache | 10K operands | 10 KB each | 10 operand per key] DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions [FullMergeV2] readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s [master] readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s ``` Test Plan: COMPILE_WITH_ASAN=1 make check -j64 Reviewers: yhchiang, andrewkr, sdong Reviewed By: sdong Subscribers: lovro, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D57075
696 lines
22 KiB
C++
696 lines
22 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#pragma once
|
|
#include <algorithm>
|
|
#include <deque>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include "rocksdb/compaction_filter.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/iterator.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/slice.h"
|
|
#include "rocksdb/table.h"
|
|
#include "table/block_based_table_factory.h"
|
|
#include "table/internal_iterator.h"
|
|
#include "table/plain_table_factory.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/random.h"
|
|
|
|
namespace rocksdb {
|
|
class SequentialFile;
|
|
class SequentialFileReader;
|
|
|
|
namespace test {
|
|
|
|
// Store in *dst a random string of length "len" and return a Slice that
|
|
// references the generated data.
|
|
extern Slice RandomString(Random* rnd, int len, std::string* dst);
|
|
|
|
extern std::string RandomHumanReadableString(Random* rnd, int len);
|
|
|
|
// Return a random key with the specified length that may contain interesting
|
|
// characters (e.g. \x00, \xff, etc.).
|
|
enum RandomKeyType : char { RANDOM, LARGEST, SMALLEST, MIDDLE };
|
|
extern std::string RandomKey(Random* rnd, int len,
|
|
RandomKeyType type = RandomKeyType::RANDOM);
|
|
|
|
// Store in *dst a string of length "len" that will compress to
|
|
// "N*compressed_fraction" bytes and return a Slice that references
|
|
// the generated data.
|
|
extern Slice CompressibleString(Random* rnd, double compressed_fraction,
|
|
int len, std::string* dst);
|
|
|
|
// A wrapper that allows injection of errors.
|
|
class ErrorEnv : public EnvWrapper {
|
|
public:
|
|
bool writable_file_error_;
|
|
int num_writable_file_errors_;
|
|
|
|
ErrorEnv() : EnvWrapper(Env::Default()),
|
|
writable_file_error_(false),
|
|
num_writable_file_errors_(0) { }
|
|
|
|
virtual Status NewWritableFile(const std::string& fname,
|
|
unique_ptr<WritableFile>* result,
|
|
const EnvOptions& soptions) override {
|
|
result->reset();
|
|
if (writable_file_error_) {
|
|
++num_writable_file_errors_;
|
|
return Status::IOError(fname, "fake error");
|
|
}
|
|
return target()->NewWritableFile(fname, result, soptions);
|
|
}
|
|
};
|
|
|
|
// An internal comparator that just forward comparing results from the
|
|
// user comparator in it. Can be used to test entities that have no dependency
|
|
// on internal key structure but consumes InternalKeyComparator, like
|
|
// BlockBasedTable.
|
|
class PlainInternalKeyComparator : public InternalKeyComparator {
|
|
public:
|
|
explicit PlainInternalKeyComparator(const Comparator* c)
|
|
: InternalKeyComparator(c) {}
|
|
|
|
virtual ~PlainInternalKeyComparator() {}
|
|
|
|
virtual int Compare(const Slice& a, const Slice& b) const override {
|
|
return user_comparator()->Compare(a, b);
|
|
}
|
|
virtual void FindShortestSeparator(std::string* start,
|
|
const Slice& limit) const override {
|
|
user_comparator()->FindShortestSeparator(start, limit);
|
|
}
|
|
virtual void FindShortSuccessor(std::string* key) const override {
|
|
user_comparator()->FindShortSuccessor(key);
|
|
}
|
|
};
|
|
|
|
// A test comparator which compare two strings in this way:
|
|
// (1) first compare prefix of 8 bytes in alphabet order,
|
|
// (2) if two strings share the same prefix, sort the other part of the string
|
|
// in the reverse alphabet order.
|
|
// This helps simulate the case of compounded key of [entity][timestamp] and
|
|
// latest timestamp first.
|
|
class SimpleSuffixReverseComparator : public Comparator {
|
|
public:
|
|
SimpleSuffixReverseComparator() {}
|
|
|
|
virtual const char* Name() const override {
|
|
return "SimpleSuffixReverseComparator";
|
|
}
|
|
|
|
virtual int Compare(const Slice& a, const Slice& b) const override {
|
|
Slice prefix_a = Slice(a.data(), 8);
|
|
Slice prefix_b = Slice(b.data(), 8);
|
|
int prefix_comp = prefix_a.compare(prefix_b);
|
|
if (prefix_comp != 0) {
|
|
return prefix_comp;
|
|
} else {
|
|
Slice suffix_a = Slice(a.data() + 8, a.size() - 8);
|
|
Slice suffix_b = Slice(b.data() + 8, b.size() - 8);
|
|
return -(suffix_a.compare(suffix_b));
|
|
}
|
|
}
|
|
virtual void FindShortestSeparator(std::string* start,
|
|
const Slice& limit) const override {}
|
|
|
|
virtual void FindShortSuccessor(std::string* key) const override {}
|
|
};
|
|
|
|
// Returns a user key comparator that can be used for comparing two uint64_t
|
|
// slices. Instead of comparing slices byte-wise, it compares all the 8 bytes
|
|
// at once. Assumes same endian-ness is used though the database's lifetime.
|
|
// Symantics of comparison would differ from Bytewise comparator in little
|
|
// endian machines.
|
|
extern const Comparator* Uint64Comparator();
|
|
|
|
// Iterator over a vector of keys/values
|
|
class VectorIterator : public InternalIterator {
|
|
public:
|
|
explicit VectorIterator(const std::vector<std::string>& keys)
|
|
: keys_(keys), current_(keys.size()) {
|
|
std::sort(keys_.begin(), keys_.end());
|
|
values_.resize(keys.size());
|
|
}
|
|
|
|
VectorIterator(const std::vector<std::string>& keys,
|
|
const std::vector<std::string>& values)
|
|
: keys_(keys), values_(values), current_(keys.size()) {
|
|
assert(keys_.size() == values_.size());
|
|
}
|
|
|
|
virtual bool Valid() const override { return current_ < keys_.size(); }
|
|
|
|
virtual void SeekToFirst() override { current_ = 0; }
|
|
virtual void SeekToLast() override { current_ = keys_.size() - 1; }
|
|
|
|
virtual void Seek(const Slice& target) override {
|
|
current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
|
|
keys_.begin();
|
|
}
|
|
|
|
virtual void Next() override { current_++; }
|
|
virtual void Prev() override { current_--; }
|
|
|
|
virtual Slice key() const override { return Slice(keys_[current_]); }
|
|
virtual Slice value() const override { return Slice(values_[current_]); }
|
|
|
|
virtual Status status() const override { return Status::OK(); }
|
|
|
|
private:
|
|
std::vector<std::string> keys_;
|
|
std::vector<std::string> values_;
|
|
size_t current_;
|
|
};
|
|
extern WritableFileWriter* GetWritableFileWriter(WritableFile* wf);
|
|
|
|
extern RandomAccessFileReader* GetRandomAccessFileReader(RandomAccessFile* raf);
|
|
|
|
extern SequentialFileReader* GetSequentialFileReader(SequentialFile* se);
|
|
|
|
class StringSink: public WritableFile {
|
|
public:
|
|
std::string contents_;
|
|
|
|
explicit StringSink(Slice* reader_contents = nullptr) :
|
|
WritableFile(),
|
|
contents_(""),
|
|
reader_contents_(reader_contents),
|
|
last_flush_(0) {
|
|
if (reader_contents_ != nullptr) {
|
|
*reader_contents_ = Slice(contents_.data(), 0);
|
|
}
|
|
}
|
|
|
|
const std::string& contents() const { return contents_; }
|
|
|
|
virtual Status Truncate(uint64_t size) override {
|
|
contents_.resize(static_cast<size_t>(size));
|
|
return Status::OK();
|
|
}
|
|
virtual Status Close() override { return Status::OK(); }
|
|
virtual Status Flush() override {
|
|
if (reader_contents_ != nullptr) {
|
|
assert(reader_contents_->size() <= last_flush_);
|
|
size_t offset = last_flush_ - reader_contents_->size();
|
|
*reader_contents_ = Slice(
|
|
contents_.data() + offset,
|
|
contents_.size() - offset);
|
|
last_flush_ = contents_.size();
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
virtual Status Sync() override { return Status::OK(); }
|
|
virtual Status Append(const Slice& slice) override {
|
|
contents_.append(slice.data(), slice.size());
|
|
return Status::OK();
|
|
}
|
|
void Drop(size_t bytes) {
|
|
if (reader_contents_ != nullptr) {
|
|
contents_.resize(contents_.size() - bytes);
|
|
*reader_contents_ = Slice(
|
|
reader_contents_->data(), reader_contents_->size() - bytes);
|
|
last_flush_ = contents_.size();
|
|
}
|
|
}
|
|
|
|
private:
|
|
Slice* reader_contents_;
|
|
size_t last_flush_;
|
|
};
|
|
|
|
// Like StringSink, this writes into a string. Unlink StringSink, it
|
|
// has some initial content and overwrites it, just like a recycled
|
|
// log file.
|
|
class OverwritingStringSink : public WritableFile {
|
|
public:
|
|
explicit OverwritingStringSink(Slice* reader_contents)
|
|
: WritableFile(),
|
|
contents_(""),
|
|
reader_contents_(reader_contents),
|
|
last_flush_(0) {}
|
|
|
|
const std::string& contents() const { return contents_; }
|
|
|
|
virtual Status Truncate(uint64_t size) override {
|
|
contents_.resize(static_cast<size_t>(size));
|
|
return Status::OK();
|
|
}
|
|
virtual Status Close() override { return Status::OK(); }
|
|
virtual Status Flush() override {
|
|
if (last_flush_ < contents_.size()) {
|
|
assert(reader_contents_->size() >= contents_.size());
|
|
memcpy((char*)reader_contents_->data() + last_flush_,
|
|
contents_.data() + last_flush_, contents_.size() - last_flush_);
|
|
last_flush_ = contents_.size();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
virtual Status Sync() override { return Status::OK(); }
|
|
virtual Status Append(const Slice& slice) override {
|
|
contents_.append(slice.data(), slice.size());
|
|
return Status::OK();
|
|
}
|
|
void Drop(size_t bytes) {
|
|
contents_.resize(contents_.size() - bytes);
|
|
if (last_flush_ > contents_.size()) last_flush_ = contents_.size();
|
|
}
|
|
|
|
private:
|
|
std::string contents_;
|
|
Slice* reader_contents_;
|
|
size_t last_flush_;
|
|
};
|
|
|
|
class StringSource: public RandomAccessFile {
|
|
public:
|
|
explicit StringSource(const Slice& contents, uint64_t uniq_id = 0,
|
|
bool mmap = false)
|
|
: contents_(contents.data(), contents.size()),
|
|
uniq_id_(uniq_id),
|
|
mmap_(mmap),
|
|
total_reads_(0) {}
|
|
|
|
virtual ~StringSource() { }
|
|
|
|
uint64_t Size() const { return contents_.size(); }
|
|
|
|
virtual Status Read(uint64_t offset, size_t n, Slice* result,
|
|
char* scratch) const override {
|
|
total_reads_++;
|
|
if (offset > contents_.size()) {
|
|
return Status::InvalidArgument("invalid Read offset");
|
|
}
|
|
if (offset + n > contents_.size()) {
|
|
n = contents_.size() - static_cast<size_t>(offset);
|
|
}
|
|
if (!mmap_) {
|
|
memcpy(scratch, &contents_[static_cast<size_t>(offset)], n);
|
|
*result = Slice(scratch, n);
|
|
} else {
|
|
*result = Slice(&contents_[static_cast<size_t>(offset)], n);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
virtual size_t GetUniqueId(char* id, size_t max_size) const override {
|
|
if (max_size < 20) {
|
|
return 0;
|
|
}
|
|
|
|
char* rid = id;
|
|
rid = EncodeVarint64(rid, uniq_id_);
|
|
rid = EncodeVarint64(rid, 0);
|
|
return static_cast<size_t>(rid-id);
|
|
}
|
|
|
|
int total_reads() const { return total_reads_; }
|
|
|
|
void set_total_reads(int tr) { total_reads_ = tr; }
|
|
|
|
private:
|
|
std::string contents_;
|
|
uint64_t uniq_id_;
|
|
bool mmap_;
|
|
mutable int total_reads_;
|
|
};
|
|
|
|
class NullLogger : public Logger {
|
|
public:
|
|
using Logger::Logv;
|
|
virtual void Logv(const char* format, va_list ap) override {}
|
|
virtual size_t GetLogFileSize() const override { return 0; }
|
|
};
|
|
|
|
// Corrupts key by changing the type
|
|
extern void CorruptKeyType(InternalKey* ikey);
|
|
|
|
extern std::string KeyStr(const std::string& user_key,
|
|
const SequenceNumber& seq, const ValueType& t,
|
|
bool corrupt = false);
|
|
|
|
class SleepingBackgroundTask {
|
|
public:
|
|
SleepingBackgroundTask()
|
|
: bg_cv_(&mutex_),
|
|
should_sleep_(true),
|
|
done_with_sleep_(false),
|
|
sleeping_(false) {}
|
|
|
|
bool IsSleeping() {
|
|
MutexLock l(&mutex_);
|
|
return sleeping_;
|
|
}
|
|
void DoSleep() {
|
|
MutexLock l(&mutex_);
|
|
sleeping_ = true;
|
|
bg_cv_.SignalAll();
|
|
while (should_sleep_) {
|
|
bg_cv_.Wait();
|
|
}
|
|
sleeping_ = false;
|
|
done_with_sleep_ = true;
|
|
bg_cv_.SignalAll();
|
|
}
|
|
void WaitUntilSleeping() {
|
|
MutexLock l(&mutex_);
|
|
while (!sleeping_ || !should_sleep_) {
|
|
bg_cv_.Wait();
|
|
}
|
|
}
|
|
void WakeUp() {
|
|
MutexLock l(&mutex_);
|
|
should_sleep_ = false;
|
|
bg_cv_.SignalAll();
|
|
}
|
|
void WaitUntilDone() {
|
|
MutexLock l(&mutex_);
|
|
while (!done_with_sleep_) {
|
|
bg_cv_.Wait();
|
|
}
|
|
}
|
|
bool WokenUp() {
|
|
MutexLock l(&mutex_);
|
|
return should_sleep_ == false;
|
|
}
|
|
|
|
void Reset() {
|
|
MutexLock l(&mutex_);
|
|
should_sleep_ = true;
|
|
done_with_sleep_ = false;
|
|
}
|
|
|
|
static void DoSleepTask(void* arg) {
|
|
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
|
|
}
|
|
|
|
private:
|
|
port::Mutex mutex_;
|
|
port::CondVar bg_cv_; // Signalled when background work finishes
|
|
bool should_sleep_;
|
|
bool done_with_sleep_;
|
|
bool sleeping_;
|
|
};
|
|
|
|
// Filters merge operands and values that are equal to `num`.
|
|
class FilterNumber : public CompactionFilter {
|
|
public:
|
|
explicit FilterNumber(uint64_t num) : num_(num) {}
|
|
|
|
std::string last_merge_operand_key() { return last_merge_operand_key_; }
|
|
|
|
bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value,
|
|
std::string* new_value, bool* value_changed) const override {
|
|
if (value.size() == sizeof(uint64_t)) {
|
|
return num_ == DecodeFixed64(value.data());
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool FilterMergeOperand(int level, const rocksdb::Slice& key,
|
|
const rocksdb::Slice& value) const override {
|
|
last_merge_operand_key_ = key.ToString();
|
|
if (value.size() == sizeof(uint64_t)) {
|
|
return num_ == DecodeFixed64(value.data());
|
|
}
|
|
return true;
|
|
}
|
|
|
|
const char* Name() const override { return "FilterBadMergeOperand"; }
|
|
|
|
private:
|
|
mutable std::string last_merge_operand_key_;
|
|
uint64_t num_;
|
|
};
|
|
|
|
inline std::string EncodeInt(uint64_t x) {
|
|
std::string result;
|
|
PutFixed64(&result, x);
|
|
return result;
|
|
}
|
|
|
|
class StringEnv : public EnvWrapper {
|
|
public:
|
|
class SeqStringSource : public SequentialFile {
|
|
public:
|
|
explicit SeqStringSource(const std::string& data)
|
|
: data_(data), offset_(0) {}
|
|
~SeqStringSource() {}
|
|
Status Read(size_t n, Slice* result, char* scratch) override {
|
|
std::string output;
|
|
if (offset_ < data_.size()) {
|
|
n = std::min(data_.size() - offset_, n);
|
|
memcpy(scratch, data_.data() + offset_, n);
|
|
offset_ += n;
|
|
*result = Slice(scratch, n);
|
|
} else {
|
|
return Status::InvalidArgument(
|
|
"Attemp to read when it already reached eof.");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
Status Skip(uint64_t n) override {
|
|
if (offset_ >= data_.size()) {
|
|
return Status::InvalidArgument(
|
|
"Attemp to read when it already reached eof.");
|
|
}
|
|
// TODO(yhchiang): Currently doesn't handle the overflow case.
|
|
offset_ += n;
|
|
return Status::OK();
|
|
}
|
|
|
|
private:
|
|
std::string data_;
|
|
size_t offset_;
|
|
};
|
|
|
|
class StringSink : public WritableFile {
|
|
public:
|
|
explicit StringSink(std::string* contents)
|
|
: WritableFile(), contents_(contents) {}
|
|
virtual Status Truncate(uint64_t size) override {
|
|
contents_->resize(size);
|
|
return Status::OK();
|
|
}
|
|
virtual Status Close() override { return Status::OK(); }
|
|
virtual Status Flush() override { return Status::OK(); }
|
|
virtual Status Sync() override { return Status::OK(); }
|
|
virtual Status Append(const Slice& slice) override {
|
|
contents_->append(slice.data(), slice.size());
|
|
return Status::OK();
|
|
}
|
|
|
|
private:
|
|
std::string* contents_;
|
|
};
|
|
|
|
explicit StringEnv(Env* t) : EnvWrapper(t) {}
|
|
virtual ~StringEnv() {}
|
|
|
|
const std::string& GetContent(const std::string& f) { return files_[f]; }
|
|
|
|
const Status WriteToNewFile(const std::string& file_name,
|
|
const std::string& content) {
|
|
unique_ptr<WritableFile> r;
|
|
auto s = NewWritableFile(file_name, &r, EnvOptions());
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
r->Append(content);
|
|
r->Flush();
|
|
r->Close();
|
|
assert(files_[file_name] == content);
|
|
return Status::OK();
|
|
}
|
|
|
|
// The following text is boilerplate that forwards all methods to target()
|
|
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
|
|
const EnvOptions& options) override {
|
|
auto iter = files_.find(f);
|
|
if (iter == files_.end()) {
|
|
return Status::NotFound("The specified file does not exist", f);
|
|
}
|
|
r->reset(new SeqStringSource(iter->second));
|
|
return Status::OK();
|
|
}
|
|
Status NewRandomAccessFile(const std::string& f,
|
|
unique_ptr<RandomAccessFile>* r,
|
|
const EnvOptions& options) override {
|
|
return Status::NotSupported();
|
|
}
|
|
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
|
|
const EnvOptions& options) override {
|
|
auto iter = files_.find(f);
|
|
if (iter != files_.end()) {
|
|
return Status::IOError("The specified file already exists", f);
|
|
}
|
|
r->reset(new StringSink(&files_[f]));
|
|
return Status::OK();
|
|
}
|
|
virtual Status NewDirectory(const std::string& name,
|
|
unique_ptr<Directory>* result) override {
|
|
return Status::NotSupported();
|
|
}
|
|
Status FileExists(const std::string& f) override {
|
|
if (files_.find(f) == files_.end()) {
|
|
return Status::NotFound();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
Status GetChildren(const std::string& dir,
|
|
std::vector<std::string>* r) override {
|
|
return Status::NotSupported();
|
|
}
|
|
Status DeleteFile(const std::string& f) override {
|
|
files_.erase(f);
|
|
return Status::OK();
|
|
}
|
|
Status CreateDir(const std::string& d) override {
|
|
return Status::NotSupported();
|
|
}
|
|
Status CreateDirIfMissing(const std::string& d) override {
|
|
return Status::NotSupported();
|
|
}
|
|
Status DeleteDir(const std::string& d) override {
|
|
return Status::NotSupported();
|
|
}
|
|
Status GetFileSize(const std::string& f, uint64_t* s) override {
|
|
auto iter = files_.find(f);
|
|
if (iter == files_.end()) {
|
|
return Status::NotFound("The specified file does not exist:", f);
|
|
}
|
|
*s = iter->second.size();
|
|
return Status::OK();
|
|
}
|
|
|
|
Status GetFileModificationTime(const std::string& fname,
|
|
uint64_t* file_mtime) override {
|
|
return Status::NotSupported();
|
|
}
|
|
|
|
Status RenameFile(const std::string& s, const std::string& t) override {
|
|
return Status::NotSupported();
|
|
}
|
|
|
|
Status LinkFile(const std::string& s, const std::string& t) override {
|
|
return Status::NotSupported();
|
|
}
|
|
|
|
Status LockFile(const std::string& f, FileLock** l) override {
|
|
return Status::NotSupported();
|
|
}
|
|
|
|
Status UnlockFile(FileLock* l) override { return Status::NotSupported(); }
|
|
|
|
protected:
|
|
std::unordered_map<std::string, std::string> files_;
|
|
};
|
|
|
|
// Randomly initialize the given DBOptions
|
|
void RandomInitDBOptions(DBOptions* db_opt, Random* rnd);
|
|
|
|
// Randomly initialize the given ColumnFamilyOptions
|
|
// Note that the caller is responsible for releasing non-null
|
|
// cf_opt->compaction_filter.
|
|
void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd);
|
|
|
|
// A dummy merge operator which can change its name
|
|
class ChanglingMergeOperator : public MergeOperator {
|
|
public:
|
|
explicit ChanglingMergeOperator(const std::string& name)
|
|
: name_(name + "MergeOperator") {}
|
|
~ChanglingMergeOperator() {}
|
|
|
|
void SetName(const std::string& name) { name_ = name; }
|
|
|
|
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
|
|
MergeOperationOutput* merge_out) const override {
|
|
return false;
|
|
}
|
|
virtual bool PartialMergeMulti(const Slice& key,
|
|
const std::deque<Slice>& operand_list,
|
|
std::string* new_value,
|
|
Logger* logger) const override {
|
|
return false;
|
|
}
|
|
virtual const char* Name() const override { return name_.c_str(); }
|
|
|
|
protected:
|
|
std::string name_;
|
|
};
|
|
|
|
// Returns a dummy merge operator with random name.
|
|
MergeOperator* RandomMergeOperator(Random* rnd);
|
|
|
|
// A dummy compaction filter which can change its name
|
|
class ChanglingCompactionFilter : public CompactionFilter {
|
|
public:
|
|
explicit ChanglingCompactionFilter(const std::string& name)
|
|
: name_(name + "CompactionFilter") {}
|
|
~ChanglingCompactionFilter() {}
|
|
|
|
void SetName(const std::string& name) { name_ = name; }
|
|
|
|
bool Filter(int level, const Slice& key, const Slice& existing_value,
|
|
std::string* new_value, bool* value_changed) const override {
|
|
return false;
|
|
}
|
|
|
|
const char* Name() const override { return name_.c_str(); }
|
|
|
|
private:
|
|
std::string name_;
|
|
};
|
|
|
|
// Returns a dummy compaction filter with a random name.
|
|
CompactionFilter* RandomCompactionFilter(Random* rnd);
|
|
|
|
// A dummy compaction filter factory which can change its name
|
|
class ChanglingCompactionFilterFactory : public CompactionFilterFactory {
|
|
public:
|
|
explicit ChanglingCompactionFilterFactory(const std::string& name)
|
|
: name_(name + "CompactionFilterFactory") {}
|
|
~ChanglingCompactionFilterFactory() {}
|
|
|
|
void SetName(const std::string& name) { name_ = name; }
|
|
|
|
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
|
const CompactionFilter::Context& context) override {
|
|
return std::unique_ptr<CompactionFilter>();
|
|
}
|
|
|
|
// Returns a name that identifies this compaction filter factory.
|
|
const char* Name() const override { return name_.c_str(); }
|
|
|
|
protected:
|
|
std::string name_;
|
|
};
|
|
|
|
CompressionType RandomCompressionType(Random* rnd);
|
|
|
|
void RandomCompressionTypeVector(const size_t count,
|
|
std::vector<CompressionType>* types,
|
|
Random* rnd);
|
|
|
|
CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd);
|
|
|
|
const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1);
|
|
|
|
TableFactory* RandomTableFactory(Random* rnd, int pre_defined = -1);
|
|
|
|
std::string RandomName(Random* rnd, const size_t len);
|
|
|
|
} // namespace test
|
|
} // namespace rocksdb
|