rocksdb/db/memtable.cc
Dhruba Borthakur 806e264350 Ability for rocksdb to compact when flushing the in-memory memtable to a file in L0.
Summary:
Rocks accumulates recent writes and deletes in the in-memory memtable.
When the memtable is full, it writes the contents on the memtable to
a file in L0.

This patch removes redundant records at the time of the flush. If there
are multiple versions of the same key in the memtable, then only the
most recent one is dumped into the output file. The purging of
redundant records occur only if the most recent snapshot is earlier
than the earliest record in the memtable.

Should we switch on this feature by default or should we keep this feature
turned off in the default settings?

Test Plan: Added test case to db_test.cc

Reviewers: sheki, vamsi, emayanke, heyongqiang

Reviewed By: sheki

CC: leveldb

Differential Revision: https://reviews.facebook.net/D8991
2013-03-04 00:01:47 -08:00

157 lines
5 KiB
C++

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/memtable.h"
#include "db/dbformat.h"
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "util/coding.h"
namespace leveldb {
static Slice GetLengthPrefixedSlice(const char* data) {
uint32_t len;
const char* p = data;
p = GetVarint32Ptr(p, p + 5, &len); // +5: we assume "p" is not corrupted
return Slice(p, len);
}
MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel)
: comparator_(cmp),
refs_(0),
table_(comparator_, &arena_),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
edit_(numlevel),
first_seqno_(0) {
}
MemTable::~MemTable() {
assert(refs_ == 0);
}
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
return comparator.Compare(a, b);
}
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
scratch->clear();
PutVarint32(scratch, target.size());
scratch->append(target.data(), target.size());
return scratch->data();
}
class MemTableIterator: public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }
virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_.SeekToFirst(); }
virtual void SeekToLast() { iter_.SeekToLast(); }
virtual void Next() { iter_.Next(); }
virtual void Prev() { iter_.Prev(); }
virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }
virtual Slice value() const {
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
virtual Status status() const { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
MemTableIterator(const MemTableIterator&);
void operator=(const MemTableIterator&);
};
Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
}
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len =
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == (unsigned)encoded_len);
table_.Insert(buf);
// The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_);
if (first_seqno_ == 0) {
first_seqno_ = s;
}
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
} // namespace leveldb