rocksdb/db/memtable.cc
Jim Paton 74781a0c49 Add three new MemTableRep's
Summary:
This patch adds three new MemTableRep's: UnsortedRep, PrefixHashRep, and VectorRep.

UnsortedRep stores keys in an std::unordered_map of std::sets. When an iterator is requested, it dumps the keys into an std::set and iterates over that.

VectorRep stores keys in an std::vector. When an iterator is requested, it creates a copy of the vector and sorts it using std::sort. The iterator accesses that new vector.

PrefixHashRep stores keys in an unordered_map mapping prefixes to ordered sets.

I also added one API change. I added a function MemTableRep::MarkImmutable. This function is called when the rep is added to the immutable list. It doesn't do anything yet, but it seems like that could be useful. In particular, for the vectorrep, it means we could elide the extra copy and just sort in place. The only reason I haven't done that yet is because the use of the ArenaAllocator complicates things (I can elaborate on this if needed).

Test Plan:
make -j32 check
./db_stress --memtablerep=vector
./db_stress --memtablerep=unsorted
./db_stress --memtablerep=prefixhash --prefix_size=10

Reviewers: dhruba, haobo, emayanke

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12117
2013-08-22 23:10:02 -07:00

241 lines
8 KiB
C++

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/memtable.h"
#include <memory>
#include "db/dbformat.h"
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/merge_operator.h"
#include "util/coding.h"
#include "util/murmurhash.h"
namespace leveldb {
MemTable::MemTable(const InternalKeyComparator& cmp,
std::shared_ptr<MemTableRepFactory> table_factory,
int numlevel,
const Options& options)
: comparator_(cmp),
refs_(0),
arena_impl_(options.arena_block_size),
table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
edit_(numlevel),
first_seqno_(0),
mem_logfile_number_(0) { }
MemTable::~MemTable() {
assert(refs_ == 0);
}
size_t MemTable::ApproximateMemoryUsage() {
return arena_impl_.ApproximateMemoryUsage() +
table_->ApproximateMemoryUsage();
}
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
return comparator.Compare(a, b);
}
Slice MemTableRep::UserKey(const char* key) const {
Slice slice = GetLengthPrefixedSlice(key);
return Slice(slice.data(), slice.size() - 8);
}
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
scratch->clear();
PutVarint32(scratch, target.size());
scratch->append(target.data(), target.size());
return scratch->data();
}
class MemTableIterator: public Iterator {
public:
explicit MemTableIterator(MemTableRep* table)
: iter_(table->GetIterator()) { }
MemTableIterator(MemTableRep* table, const Slice* prefix)
: iter_(table->GetPrefixIterator(*prefix)) { }
virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_->SeekToFirst(); }
virtual void SeekToLast() { iter_->SeekToLast(); }
virtual void Next() { iter_->Next(); }
virtual void Prev() { iter_->Prev(); }
virtual Slice key() const {
return GetLengthPrefixedSlice(iter_->key());
}
virtual Slice value() const {
Slice key_slice = GetLengthPrefixedSlice(iter_->key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
virtual Status status() const { return Status::OK(); }
private:
std::shared_ptr<MemTableRep::Iterator> iter_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
MemTableIterator(const MemTableIterator&);
void operator=(const MemTableIterator&);
};
Iterator* MemTable::NewIterator(const Slice* prefix) {
if (prefix) {
return new MemTableIterator(table_.get(), prefix);
} else {
return new MemTableIterator(table_.get());
}
}
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len =
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_impl_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == (unsigned)encoded_len);
table_->Insert(buf);
// The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_);
if (first_seqno_ == 0) {
first_seqno_ = s;
}
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options) {
Slice memkey = key.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key()));
iter->Seek(memkey.data());
// It is the caller's responsibility to allocate/delete operands list
assert(operands != nullptr);
bool merge_in_progress = s->IsMergeInProgress();
auto merge_operator = options.merge_operator.get();
auto logger = options.info_log;
std::string merge_result;
for (; iter->Valid(); iter->Next()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter->key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*s = Status::OK();
if (merge_in_progress) {
assert(merge_operator);
if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
value, logger.get())) {
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
value->assign(v.data(), v.size());
}
return true;
}
case kTypeDeletion: {
if (merge_in_progress) {
assert(merge_operator);
*s = Status::OK();
if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
value, logger.get())) {
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
*s = Status::NotFound(Slice());
}
return true;
}
case kTypeMerge: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
merge_in_progress = true;
operands->push_front(v.ToString());
while(operands->size() >= 2) {
// Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(key.user_key(),
Slice((*operands)[0]),
Slice((*operands)[1]),
&merge_result,
logger.get())) {
operands->pop_front();
swap(operands->front(), merge_result);
} else {
// Stack them because user can't associative merge
break;
}
}
break;
}
case kTypeLogData:
assert(false);
break;
}
} else {
// exit loop if user key does not match
break;
}
}
// No change to value, since we have not yet found a Put/Delete
if (merge_in_progress) {
*s = Status::MergeInProgress("");
}
return false;
}
} // namespace leveldb