mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
813719e952
Summary: Currently DB does not accept duplicate keys (keys with the same user key and the same sequence number). If Memtable returns false when receiving such keys, we can benefit from this signal to properly increase the sequence number in the rare cases when we have a duplicate key in the write batch written to DB under WritePrepared transactions. Closes https://github.com/facebook/rocksdb/pull/3418 Differential Revision: D6822412 Pulled By: maysamyabandeh fbshipit-source-id: adea3ce5073131cd38ed52b16bea0673b1a19e77
849 lines
29 KiB
C++
849 lines
29 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
#include "memtable/hash_linklist_rep.h"
|
|
|
|
#include <algorithm>
|
|
#include <atomic>
|
|
#include "db/memtable.h"
|
|
#include "memtable/skiplist.h"
|
|
#include "monitoring/histogram.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/memtablerep.h"
|
|
#include "rocksdb/slice.h"
|
|
#include "rocksdb/slice_transform.h"
|
|
#include "util/arena.h"
|
|
#include "util/murmurhash.h"
|
|
|
|
namespace rocksdb {
|
|
namespace {
|
|
|
|
typedef const char* Key;
|
|
typedef SkipList<Key, const MemTableRep::KeyComparator&> MemtableSkipList;
|
|
typedef std::atomic<void*> Pointer;
|
|
|
|
// A data structure used as the header of a link list of a hash bucket.
|
|
struct BucketHeader {
|
|
Pointer next;
|
|
std::atomic<uint32_t> num_entries;
|
|
|
|
explicit BucketHeader(void* n, uint32_t count)
|
|
: next(n), num_entries(count) {}
|
|
|
|
bool IsSkipListBucket() {
|
|
return next.load(std::memory_order_relaxed) == this;
|
|
}
|
|
|
|
uint32_t GetNumEntries() const {
|
|
return num_entries.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
// REQUIRES: called from single-threaded Insert()
|
|
void IncNumEntries() {
|
|
// Only one thread can do write at one time. No need to do atomic
|
|
// incremental. Update it with relaxed load and store.
|
|
num_entries.store(GetNumEntries() + 1, std::memory_order_relaxed);
|
|
}
|
|
};
|
|
|
|
// A data structure used as the header of a skip list of a hash bucket.
|
|
struct SkipListBucketHeader {
|
|
BucketHeader Counting_header;
|
|
MemtableSkipList skip_list;
|
|
|
|
explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp,
|
|
Allocator* allocator, uint32_t count)
|
|
: Counting_header(this, // Pointing to itself to indicate header type.
|
|
count),
|
|
skip_list(cmp, allocator) {}
|
|
};
|
|
|
|
struct Node {
|
|
// Accessors/mutators for links. Wrapped in methods so we can
|
|
// add the appropriate barriers as necessary.
|
|
Node* Next() {
|
|
// Use an 'acquire load' so that we observe a fully initialized
|
|
// version of the returned Node.
|
|
return next_.load(std::memory_order_acquire);
|
|
}
|
|
void SetNext(Node* x) {
|
|
// Use a 'release store' so that anybody who reads through this
|
|
// pointer observes a fully initialized version of the inserted node.
|
|
next_.store(x, std::memory_order_release);
|
|
}
|
|
// No-barrier variants that can be safely used in a few locations.
|
|
Node* NoBarrier_Next() {
|
|
return next_.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
void NoBarrier_SetNext(Node* x) { next_.store(x, std::memory_order_relaxed); }
|
|
|
|
// Needed for placement new below which is fine
|
|
Node() {}
|
|
|
|
private:
|
|
std::atomic<Node*> next_;
|
|
|
|
// Prohibit copying due to the below
|
|
Node(const Node&) = delete;
|
|
Node& operator=(const Node&) = delete;
|
|
|
|
public:
|
|
char key[1];
|
|
};
|
|
|
|
// Memory structure of the mem table:
|
|
// It is a hash table, each bucket points to one entry, a linked list or a
|
|
// skip list. In order to track total number of records in a bucket to determine
|
|
// whether should switch to skip list, a header is added just to indicate
|
|
// number of entries in the bucket.
|
|
//
|
|
//
|
|
// +-----> NULL Case 1. Empty bucket
|
|
// |
|
|
// |
|
|
// | +---> +-------+
|
|
// | | | Next +--> NULL
|
|
// | | +-------+
|
|
// +-----+ | | | | Case 2. One Entry in bucket.
|
|
// | +-+ | | Data | next pointer points to
|
|
// +-----+ | | | NULL. All other cases
|
|
// | | | | | next pointer is not NULL.
|
|
// +-----+ | +-------+
|
|
// | +---+
|
|
// +-----+ +-> +-------+ +> +-------+ +-> +-------+
|
|
// | | | | Next +--+ | Next +--+ | Next +-->NULL
|
|
// +-----+ | +-------+ +-------+ +-------+
|
|
// | +-----+ | Count | | | | |
|
|
// +-----+ +-------+ | Data | | Data |
|
|
// | | | | | |
|
|
// +-----+ Case 3. | | | |
|
|
// | | A header +-------+ +-------+
|
|
// +-----+ points to
|
|
// | | a linked list. Count indicates total number
|
|
// +-----+ of rows in this bucket.
|
|
// | |
|
|
// +-----+ +-> +-------+ <--+
|
|
// | | | | Next +----+
|
|
// +-----+ | +-------+ Case 4. A header points to a skip
|
|
// | +----+ | Count | list and next pointer points to
|
|
// +-----+ +-------+ itself, to distinguish case 3 or 4.
|
|
// | | | | Count still is kept to indicates total
|
|
// +-----+ | Skip +--> of entries in the bucket for debugging
|
|
// | | | List | Data purpose.
|
|
// | | | +-->
|
|
// +-----+ | |
|
|
// | | +-------+
|
|
// +-----+
|
|
//
|
|
// We don't have data race when changing cases because:
|
|
// (1) When changing from case 2->3, we create a new bucket header, put the
|
|
// single node there first without changing the original node, and do a
|
|
// release store when changing the bucket pointer. In that case, a reader
|
|
// who sees a stale value of the bucket pointer will read this node, while
|
|
// a reader sees the correct value because of the release store.
|
|
// (2) When changing case 3->4, a new header is created with skip list points
|
|
// to the data, before doing an acquire store to change the bucket pointer.
|
|
// The old header and nodes are never changed, so any reader sees any
|
|
// of those existing pointers will guarantee to be able to iterate to the
|
|
// end of the linked list.
|
|
// (3) Header's next pointer in case 3 might change, but they are never equal
|
|
// to itself, so no matter a reader sees any stale or newer value, it will
|
|
// be able to correctly distinguish case 3 and 4.
|
|
//
|
|
// The reason that we use case 2 is we want to make the format to be efficient
|
|
// when the utilization of buckets is relatively low. If we use case 3 for
|
|
// single entry bucket, we will need to waste 12 bytes for every entry,
|
|
// which can be significant decrease of memory utilization.
|
|
class HashLinkListRep : public MemTableRep {
|
|
public:
|
|
HashLinkListRep(const MemTableRep::KeyComparator& compare,
|
|
Allocator* allocator, const SliceTransform* transform,
|
|
size_t bucket_size, uint32_t threshold_use_skiplist,
|
|
size_t huge_page_tlb_size, Logger* logger,
|
|
int bucket_entries_logging_threshold,
|
|
bool if_log_bucket_dist_when_flash);
|
|
|
|
virtual KeyHandle Allocate(const size_t len, char** buf) override;
|
|
|
|
virtual bool Insert(KeyHandle handle) override;
|
|
|
|
virtual bool Contains(const char* key) const override;
|
|
|
|
virtual size_t ApproximateMemoryUsage() override;
|
|
|
|
virtual void Get(const LookupKey& k, void* callback_args,
|
|
bool (*callback_func)(void* arg,
|
|
const char* entry)) override;
|
|
|
|
virtual ~HashLinkListRep();
|
|
|
|
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;
|
|
|
|
virtual MemTableRep::Iterator* GetDynamicPrefixIterator(
|
|
Arena* arena = nullptr) override;
|
|
|
|
private:
|
|
friend class DynamicIterator;
|
|
|
|
size_t bucket_size_;
|
|
|
|
// Maps slices (which are transformed user keys) to buckets of keys sharing
|
|
// the same transform.
|
|
Pointer* buckets_;
|
|
|
|
const uint32_t threshold_use_skiplist_;
|
|
|
|
// The user-supplied transform whose domain is the user keys.
|
|
const SliceTransform* transform_;
|
|
|
|
const MemTableRep::KeyComparator& compare_;
|
|
|
|
Logger* logger_;
|
|
int bucket_entries_logging_threshold_;
|
|
bool if_log_bucket_dist_when_flash_;
|
|
|
|
bool LinkListContains(Node* head, const Slice& key) const;
|
|
|
|
SkipListBucketHeader* GetSkipListBucketHeader(Pointer* first_next_pointer)
|
|
const;
|
|
|
|
Node* GetLinkListFirstNode(Pointer* first_next_pointer) const;
|
|
|
|
Slice GetPrefix(const Slice& internal_key) const {
|
|
return transform_->Transform(ExtractUserKey(internal_key));
|
|
}
|
|
|
|
size_t GetHash(const Slice& slice) const {
|
|
return MurmurHash(slice.data(), static_cast<int>(slice.size()), 0) %
|
|
bucket_size_;
|
|
}
|
|
|
|
Pointer* GetBucket(size_t i) const {
|
|
return static_cast<Pointer*>(buckets_[i].load(std::memory_order_acquire));
|
|
}
|
|
|
|
Pointer* GetBucket(const Slice& slice) const {
|
|
return GetBucket(GetHash(slice));
|
|
}
|
|
|
|
bool Equal(const Slice& a, const Key& b) const {
|
|
return (compare_(b, a) == 0);
|
|
}
|
|
|
|
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
|
|
|
|
bool KeyIsAfterNode(const Slice& internal_key, const Node* n) const {
|
|
// nullptr n is considered infinite
|
|
return (n != nullptr) && (compare_(n->key, internal_key) < 0);
|
|
}
|
|
|
|
bool KeyIsAfterNode(const Key& key, const Node* n) const {
|
|
// nullptr n is considered infinite
|
|
return (n != nullptr) && (compare_(n->key, key) < 0);
|
|
}
|
|
|
|
bool KeyIsAfterOrAtNode(const Slice& internal_key, const Node* n) const {
|
|
// nullptr n is considered infinite
|
|
return (n != nullptr) && (compare_(n->key, internal_key) <= 0);
|
|
}
|
|
|
|
bool KeyIsAfterOrAtNode(const Key& key, const Node* n) const {
|
|
// nullptr n is considered infinite
|
|
return (n != nullptr) && (compare_(n->key, key) <= 0);
|
|
}
|
|
|
|
Node* FindGreaterOrEqualInBucket(Node* head, const Slice& key) const;
|
|
Node* FindLessOrEqualInBucket(Node* head, const Slice& key) const;
|
|
|
|
class FullListIterator : public MemTableRep::Iterator {
|
|
public:
|
|
explicit FullListIterator(MemtableSkipList* list, Allocator* allocator)
|
|
: iter_(list), full_list_(list), allocator_(allocator) {}
|
|
|
|
virtual ~FullListIterator() {
|
|
}
|
|
|
|
// Returns true iff the iterator is positioned at a valid node.
|
|
virtual bool Valid() const override { return iter_.Valid(); }
|
|
|
|
// Returns the key at the current position.
|
|
// REQUIRES: Valid()
|
|
virtual const char* key() const override {
|
|
assert(Valid());
|
|
return iter_.key();
|
|
}
|
|
|
|
// Advances to the next position.
|
|
// REQUIRES: Valid()
|
|
virtual void Next() override {
|
|
assert(Valid());
|
|
iter_.Next();
|
|
}
|
|
|
|
// Advances to the previous position.
|
|
// REQUIRES: Valid()
|
|
virtual void Prev() override {
|
|
assert(Valid());
|
|
iter_.Prev();
|
|
}
|
|
|
|
// Advance to the first entry with a key >= target
|
|
virtual void Seek(const Slice& internal_key,
|
|
const char* memtable_key) override {
|
|
const char* encoded_key =
|
|
(memtable_key != nullptr) ?
|
|
memtable_key : EncodeKey(&tmp_, internal_key);
|
|
iter_.Seek(encoded_key);
|
|
}
|
|
|
|
// Retreat to the last entry with a key <= target
|
|
virtual void SeekForPrev(const Slice& internal_key,
|
|
const char* memtable_key) override {
|
|
const char* encoded_key = (memtable_key != nullptr)
|
|
? memtable_key
|
|
: EncodeKey(&tmp_, internal_key);
|
|
iter_.SeekForPrev(encoded_key);
|
|
}
|
|
|
|
// Position at the first entry in collection.
|
|
// Final state of iterator is Valid() iff collection is not empty.
|
|
virtual void SeekToFirst() override { iter_.SeekToFirst(); }
|
|
|
|
// Position at the last entry in collection.
|
|
// Final state of iterator is Valid() iff collection is not empty.
|
|
virtual void SeekToLast() override { iter_.SeekToLast(); }
|
|
private:
|
|
MemtableSkipList::Iterator iter_;
|
|
// To destruct with the iterator.
|
|
std::unique_ptr<MemtableSkipList> full_list_;
|
|
std::unique_ptr<Allocator> allocator_;
|
|
std::string tmp_; // For passing to EncodeKey
|
|
};
|
|
|
|
class LinkListIterator : public MemTableRep::Iterator {
|
|
public:
|
|
explicit LinkListIterator(const HashLinkListRep* const hash_link_list_rep,
|
|
Node* head)
|
|
: hash_link_list_rep_(hash_link_list_rep),
|
|
head_(head),
|
|
node_(nullptr) {}
|
|
|
|
virtual ~LinkListIterator() {}
|
|
|
|
// Returns true iff the iterator is positioned at a valid node.
|
|
virtual bool Valid() const override { return node_ != nullptr; }
|
|
|
|
// Returns the key at the current position.
|
|
// REQUIRES: Valid()
|
|
virtual const char* key() const override {
|
|
assert(Valid());
|
|
return node_->key;
|
|
}
|
|
|
|
// Advances to the next position.
|
|
// REQUIRES: Valid()
|
|
virtual void Next() override {
|
|
assert(Valid());
|
|
node_ = node_->Next();
|
|
}
|
|
|
|
// Advances to the previous position.
|
|
// REQUIRES: Valid()
|
|
virtual void Prev() override {
|
|
// Prefix iterator does not support total order.
|
|
// We simply set the iterator to invalid state
|
|
Reset(nullptr);
|
|
}
|
|
|
|
// Advance to the first entry with a key >= target
|
|
virtual void Seek(const Slice& internal_key,
|
|
const char* memtable_key) override {
|
|
node_ = hash_link_list_rep_->FindGreaterOrEqualInBucket(head_,
|
|
internal_key);
|
|
}
|
|
|
|
// Retreat to the last entry with a key <= target
|
|
virtual void SeekForPrev(const Slice& internal_key,
|
|
const char* memtable_key) override {
|
|
// Since we do not support Prev()
|
|
// We simply do not support SeekForPrev
|
|
Reset(nullptr);
|
|
}
|
|
|
|
// Position at the first entry in collection.
|
|
// Final state of iterator is Valid() iff collection is not empty.
|
|
virtual void SeekToFirst() override {
|
|
// Prefix iterator does not support total order.
|
|
// We simply set the iterator to invalid state
|
|
Reset(nullptr);
|
|
}
|
|
|
|
// Position at the last entry in collection.
|
|
// Final state of iterator is Valid() iff collection is not empty.
|
|
virtual void SeekToLast() override {
|
|
// Prefix iterator does not support total order.
|
|
// We simply set the iterator to invalid state
|
|
Reset(nullptr);
|
|
}
|
|
|
|
protected:
|
|
void Reset(Node* head) {
|
|
head_ = head;
|
|
node_ = nullptr;
|
|
}
|
|
private:
|
|
friend class HashLinkListRep;
|
|
const HashLinkListRep* const hash_link_list_rep_;
|
|
Node* head_;
|
|
Node* node_;
|
|
|
|
virtual void SeekToHead() {
|
|
node_ = head_;
|
|
}
|
|
};
|
|
|
|
class DynamicIterator : public HashLinkListRep::LinkListIterator {
|
|
public:
|
|
explicit DynamicIterator(HashLinkListRep& memtable_rep)
|
|
: HashLinkListRep::LinkListIterator(&memtable_rep, nullptr),
|
|
memtable_rep_(memtable_rep) {}
|
|
|
|
// Advance to the first entry with a key >= target
|
|
virtual void Seek(const Slice& k, const char* memtable_key) override {
|
|
auto transformed = memtable_rep_.GetPrefix(k);
|
|
auto* bucket = memtable_rep_.GetBucket(transformed);
|
|
|
|
SkipListBucketHeader* skip_list_header =
|
|
memtable_rep_.GetSkipListBucketHeader(bucket);
|
|
if (skip_list_header != nullptr) {
|
|
// The bucket is organized as a skip list
|
|
if (!skip_list_iter_) {
|
|
skip_list_iter_.reset(
|
|
new MemtableSkipList::Iterator(&skip_list_header->skip_list));
|
|
} else {
|
|
skip_list_iter_->SetList(&skip_list_header->skip_list);
|
|
}
|
|
if (memtable_key != nullptr) {
|
|
skip_list_iter_->Seek(memtable_key);
|
|
} else {
|
|
IterKey encoded_key;
|
|
encoded_key.EncodeLengthPrefixedKey(k);
|
|
skip_list_iter_->Seek(encoded_key.GetUserKey().data());
|
|
}
|
|
} else {
|
|
// The bucket is organized as a linked list
|
|
skip_list_iter_.reset();
|
|
Reset(memtable_rep_.GetLinkListFirstNode(bucket));
|
|
HashLinkListRep::LinkListIterator::Seek(k, memtable_key);
|
|
}
|
|
}
|
|
|
|
virtual bool Valid() const override {
|
|
if (skip_list_iter_) {
|
|
return skip_list_iter_->Valid();
|
|
}
|
|
return HashLinkListRep::LinkListIterator::Valid();
|
|
}
|
|
|
|
virtual const char* key() const override {
|
|
if (skip_list_iter_) {
|
|
return skip_list_iter_->key();
|
|
}
|
|
return HashLinkListRep::LinkListIterator::key();
|
|
}
|
|
|
|
virtual void Next() override {
|
|
if (skip_list_iter_) {
|
|
skip_list_iter_->Next();
|
|
} else {
|
|
HashLinkListRep::LinkListIterator::Next();
|
|
}
|
|
}
|
|
|
|
private:
|
|
// the underlying memtable
|
|
const HashLinkListRep& memtable_rep_;
|
|
std::unique_ptr<MemtableSkipList::Iterator> skip_list_iter_;
|
|
};
|
|
|
|
class EmptyIterator : public MemTableRep::Iterator {
|
|
// This is used when there wasn't a bucket. It is cheaper than
|
|
// instantiating an empty bucket over which to iterate.
|
|
public:
|
|
EmptyIterator() { }
|
|
virtual bool Valid() const override { return false; }
|
|
virtual const char* key() const override {
|
|
assert(false);
|
|
return nullptr;
|
|
}
|
|
virtual void Next() override {}
|
|
virtual void Prev() override {}
|
|
virtual void Seek(const Slice& user_key,
|
|
const char* memtable_key) override {}
|
|
virtual void SeekForPrev(const Slice& user_key,
|
|
const char* memtable_key) override {}
|
|
virtual void SeekToFirst() override {}
|
|
virtual void SeekToLast() override {}
|
|
|
|
private:
|
|
};
|
|
};
|
|
|
|
HashLinkListRep::HashLinkListRep(
|
|
const MemTableRep::KeyComparator& compare, Allocator* allocator,
|
|
const SliceTransform* transform, size_t bucket_size,
|
|
uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, Logger* logger,
|
|
int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash)
|
|
: MemTableRep(allocator),
|
|
bucket_size_(bucket_size),
|
|
// Threshold to use skip list doesn't make sense if less than 3, so we
|
|
// force it to be minimum of 3 to simplify implementation.
|
|
threshold_use_skiplist_(std::max(threshold_use_skiplist, 3U)),
|
|
transform_(transform),
|
|
compare_(compare),
|
|
logger_(logger),
|
|
bucket_entries_logging_threshold_(bucket_entries_logging_threshold),
|
|
if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) {
|
|
char* mem = allocator_->AllocateAligned(sizeof(Pointer) * bucket_size,
|
|
huge_page_tlb_size, logger);
|
|
|
|
buckets_ = new (mem) Pointer[bucket_size];
|
|
|
|
for (size_t i = 0; i < bucket_size_; ++i) {
|
|
buckets_[i].store(nullptr, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
|
|
HashLinkListRep::~HashLinkListRep() {
|
|
}
|
|
|
|
KeyHandle HashLinkListRep::Allocate(const size_t len, char** buf) {
|
|
char* mem = allocator_->AllocateAligned(sizeof(Node) + len);
|
|
Node* x = new (mem) Node();
|
|
*buf = x->key;
|
|
return static_cast<void*>(x);
|
|
}
|
|
|
|
SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader(
|
|
Pointer* first_next_pointer) const {
|
|
if (first_next_pointer == nullptr) {
|
|
return nullptr;
|
|
}
|
|
if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) {
|
|
// Single entry bucket
|
|
return nullptr;
|
|
}
|
|
// Counting header
|
|
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
|
if (header->IsSkipListBucket()) {
|
|
assert(header->GetNumEntries() > threshold_use_skiplist_);
|
|
auto* skip_list_bucket_header =
|
|
reinterpret_cast<SkipListBucketHeader*>(header);
|
|
assert(skip_list_bucket_header->Counting_header.next.load(
|
|
std::memory_order_relaxed) == header);
|
|
return skip_list_bucket_header;
|
|
}
|
|
assert(header->GetNumEntries() <= threshold_use_skiplist_);
|
|
return nullptr;
|
|
}
|
|
|
|
Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const {
|
|
if (first_next_pointer == nullptr) {
|
|
return nullptr;
|
|
}
|
|
if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) {
|
|
// Single entry bucket
|
|
return reinterpret_cast<Node*>(first_next_pointer);
|
|
}
|
|
// Counting header
|
|
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
|
if (!header->IsSkipListBucket()) {
|
|
assert(header->GetNumEntries() <= threshold_use_skiplist_);
|
|
return reinterpret_cast<Node*>(
|
|
header->next.load(std::memory_order_acquire));
|
|
}
|
|
assert(header->GetNumEntries() > threshold_use_skiplist_);
|
|
return nullptr;
|
|
}
|
|
|
|
bool HashLinkListRep::Insert(KeyHandle handle) {
|
|
Node* x = static_cast<Node*>(handle);
|
|
assert(!Contains(x->key));
|
|
Slice internal_key = GetLengthPrefixedSlice(x->key);
|
|
auto transformed = GetPrefix(internal_key);
|
|
auto& bucket = buckets_[GetHash(transformed)];
|
|
Pointer* first_next_pointer =
|
|
static_cast<Pointer*>(bucket.load(std::memory_order_relaxed));
|
|
|
|
if (first_next_pointer == nullptr) {
|
|
// Case 1. empty bucket
|
|
// NoBarrier_SetNext() suffices since we will add a barrier when
|
|
// we publish a pointer to "x" in prev[i].
|
|
x->NoBarrier_SetNext(nullptr);
|
|
bucket.store(x, std::memory_order_release);
|
|
return true;
|
|
}
|
|
|
|
BucketHeader* header = nullptr;
|
|
if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) {
|
|
// Case 2. only one entry in the bucket
|
|
// Need to convert to a Counting bucket and turn to case 4.
|
|
Node* first = reinterpret_cast<Node*>(first_next_pointer);
|
|
// Need to add a bucket header.
|
|
// We have to first convert it to a bucket with header before inserting
|
|
// the new node. Otherwise, we might need to change next pointer of first.
|
|
// In that case, a reader might sees the next pointer is NULL and wrongly
|
|
// think the node is a bucket header.
|
|
auto* mem = allocator_->AllocateAligned(sizeof(BucketHeader));
|
|
header = new (mem) BucketHeader(first, 1);
|
|
bucket.store(header, std::memory_order_release);
|
|
} else {
|
|
header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
|
if (header->IsSkipListBucket()) {
|
|
// Case 4. Bucket is already a skip list
|
|
assert(header->GetNumEntries() > threshold_use_skiplist_);
|
|
auto* skip_list_bucket_header =
|
|
reinterpret_cast<SkipListBucketHeader*>(header);
|
|
// Only one thread can execute Insert() at one time. No need to do atomic
|
|
// incremental.
|
|
skip_list_bucket_header->Counting_header.IncNumEntries();
|
|
skip_list_bucket_header->skip_list.Insert(x->key);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
if (bucket_entries_logging_threshold_ > 0 &&
|
|
header->GetNumEntries() ==
|
|
static_cast<uint32_t>(bucket_entries_logging_threshold_)) {
|
|
Info(logger_, "HashLinkedList bucket %" ROCKSDB_PRIszt
|
|
" has more than %d "
|
|
"entries. Key to insert: %s",
|
|
GetHash(transformed), header->GetNumEntries(),
|
|
GetLengthPrefixedSlice(x->key).ToString(true).c_str());
|
|
}
|
|
|
|
if (header->GetNumEntries() == threshold_use_skiplist_) {
|
|
// Case 3. number of entries reaches the threshold so need to convert to
|
|
// skip list.
|
|
LinkListIterator bucket_iter(
|
|
this, reinterpret_cast<Node*>(
|
|
first_next_pointer->load(std::memory_order_relaxed)));
|
|
auto mem = allocator_->AllocateAligned(sizeof(SkipListBucketHeader));
|
|
SkipListBucketHeader* new_skip_list_header = new (mem)
|
|
SkipListBucketHeader(compare_, allocator_, header->GetNumEntries() + 1);
|
|
auto& skip_list = new_skip_list_header->skip_list;
|
|
|
|
// Add all current entries to the skip list
|
|
for (bucket_iter.SeekToHead(); bucket_iter.Valid(); bucket_iter.Next()) {
|
|
skip_list.Insert(bucket_iter.key());
|
|
}
|
|
|
|
// insert the new entry
|
|
skip_list.Insert(x->key);
|
|
// Set the bucket
|
|
bucket.store(new_skip_list_header, std::memory_order_release);
|
|
} else {
|
|
// Case 5. Need to insert to the sorted linked list without changing the
|
|
// header.
|
|
Node* first =
|
|
reinterpret_cast<Node*>(header->next.load(std::memory_order_relaxed));
|
|
assert(first != nullptr);
|
|
// Advance counter unless the bucket needs to be advanced to skip list.
|
|
// In that case, we need to make sure the previous count never exceeds
|
|
// threshold_use_skiplist_ to avoid readers to cast to wrong format.
|
|
header->IncNumEntries();
|
|
|
|
Node* cur = first;
|
|
Node* prev = nullptr;
|
|
while (true) {
|
|
if (cur == nullptr) {
|
|
break;
|
|
}
|
|
Node* next = cur->Next();
|
|
// Make sure the lists are sorted.
|
|
// If x points to head_ or next points nullptr, it is trivially satisfied.
|
|
assert((cur == first) || (next == nullptr) ||
|
|
KeyIsAfterNode(next->key, cur));
|
|
if (KeyIsAfterNode(internal_key, cur)) {
|
|
// Keep searching in this list
|
|
prev = cur;
|
|
cur = next;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Our data structure does not allow duplicate insertion
|
|
assert(cur == nullptr || !Equal(x->key, cur->key));
|
|
|
|
// NoBarrier_SetNext() suffices since we will add a barrier when
|
|
// we publish a pointer to "x" in prev[i].
|
|
x->NoBarrier_SetNext(cur);
|
|
|
|
if (prev) {
|
|
prev->SetNext(x);
|
|
} else {
|
|
header->next.store(static_cast<void*>(x), std::memory_order_release);
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool HashLinkListRep::Contains(const char* key) const {
|
|
Slice internal_key = GetLengthPrefixedSlice(key);
|
|
|
|
auto transformed = GetPrefix(internal_key);
|
|
auto bucket = GetBucket(transformed);
|
|
if (bucket == nullptr) {
|
|
return false;
|
|
}
|
|
|
|
SkipListBucketHeader* skip_list_header = GetSkipListBucketHeader(bucket);
|
|
if (skip_list_header != nullptr) {
|
|
return skip_list_header->skip_list.Contains(key);
|
|
} else {
|
|
return LinkListContains(GetLinkListFirstNode(bucket), internal_key);
|
|
}
|
|
}
|
|
|
|
size_t HashLinkListRep::ApproximateMemoryUsage() {
|
|
// Memory is always allocated from the allocator.
|
|
return 0;
|
|
}
|
|
|
|
void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
|
|
bool (*callback_func)(void* arg, const char* entry)) {
|
|
auto transformed = transform_->Transform(k.user_key());
|
|
auto bucket = GetBucket(transformed);
|
|
|
|
auto* skip_list_header = GetSkipListBucketHeader(bucket);
|
|
if (skip_list_header != nullptr) {
|
|
// Is a skip list
|
|
MemtableSkipList::Iterator iter(&skip_list_header->skip_list);
|
|
for (iter.Seek(k.memtable_key().data());
|
|
iter.Valid() && callback_func(callback_args, iter.key());
|
|
iter.Next()) {
|
|
}
|
|
} else {
|
|
auto* link_list_head = GetLinkListFirstNode(bucket);
|
|
if (link_list_head != nullptr) {
|
|
LinkListIterator iter(this, link_list_head);
|
|
for (iter.Seek(k.internal_key(), nullptr);
|
|
iter.Valid() && callback_func(callback_args, iter.key());
|
|
iter.Next()) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) {
|
|
// allocate a new arena of similar size to the one currently in use
|
|
Arena* new_arena = new Arena(allocator_->BlockSize());
|
|
auto list = new MemtableSkipList(compare_, new_arena);
|
|
HistogramImpl keys_per_bucket_hist;
|
|
|
|
for (size_t i = 0; i < bucket_size_; ++i) {
|
|
int count = 0;
|
|
auto* bucket = GetBucket(i);
|
|
if (bucket != nullptr) {
|
|
auto* skip_list_header = GetSkipListBucketHeader(bucket);
|
|
if (skip_list_header != nullptr) {
|
|
// Is a skip list
|
|
MemtableSkipList::Iterator itr(&skip_list_header->skip_list);
|
|
for (itr.SeekToFirst(); itr.Valid(); itr.Next()) {
|
|
list->Insert(itr.key());
|
|
count++;
|
|
}
|
|
} else {
|
|
auto* link_list_head = GetLinkListFirstNode(bucket);
|
|
if (link_list_head != nullptr) {
|
|
LinkListIterator itr(this, link_list_head);
|
|
for (itr.SeekToHead(); itr.Valid(); itr.Next()) {
|
|
list->Insert(itr.key());
|
|
count++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (if_log_bucket_dist_when_flash_) {
|
|
keys_per_bucket_hist.Add(count);
|
|
}
|
|
}
|
|
if (if_log_bucket_dist_when_flash_ && logger_ != nullptr) {
|
|
Info(logger_, "hashLinkedList Entry distribution among buckets: %s",
|
|
keys_per_bucket_hist.ToString().c_str());
|
|
}
|
|
|
|
if (alloc_arena == nullptr) {
|
|
return new FullListIterator(list, new_arena);
|
|
} else {
|
|
auto mem = alloc_arena->AllocateAligned(sizeof(FullListIterator));
|
|
return new (mem) FullListIterator(list, new_arena);
|
|
}
|
|
}
|
|
|
|
MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator(
|
|
Arena* alloc_arena) {
|
|
if (alloc_arena == nullptr) {
|
|
return new DynamicIterator(*this);
|
|
} else {
|
|
auto mem = alloc_arena->AllocateAligned(sizeof(DynamicIterator));
|
|
return new (mem) DynamicIterator(*this);
|
|
}
|
|
}
|
|
|
|
bool HashLinkListRep::LinkListContains(Node* head,
|
|
const Slice& user_key) const {
|
|
Node* x = FindGreaterOrEqualInBucket(head, user_key);
|
|
return (x != nullptr && Equal(user_key, x->key));
|
|
}
|
|
|
|
Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
|
|
const Slice& key) const {
|
|
Node* x = head;
|
|
while (true) {
|
|
if (x == nullptr) {
|
|
return x;
|
|
}
|
|
Node* next = x->Next();
|
|
// Make sure the lists are sorted.
|
|
// If x points to head_ or next points nullptr, it is trivially satisfied.
|
|
assert((x == head) || (next == nullptr) || KeyIsAfterNode(next->key, x));
|
|
if (KeyIsAfterNode(key, x)) {
|
|
// Keep searching in this list
|
|
x = next;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
return x;
|
|
}
|
|
|
|
} // anon namespace
|
|
|
|
MemTableRep* HashLinkListRepFactory::CreateMemTableRep(
|
|
const MemTableRep::KeyComparator& compare, Allocator* allocator,
|
|
const SliceTransform* transform, Logger* logger) {
|
|
return new HashLinkListRep(compare, allocator, transform, bucket_count_,
|
|
threshold_use_skiplist_, huge_page_tlb_size_,
|
|
logger, bucket_entries_logging_threshold_,
|
|
if_log_bucket_dist_when_flash_);
|
|
}
|
|
|
|
MemTableRepFactory* NewHashLinkListRepFactory(
|
|
size_t bucket_count, size_t huge_page_tlb_size,
|
|
int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash,
|
|
uint32_t threshold_use_skiplist) {
|
|
return new HashLinkListRepFactory(
|
|
bucket_count, threshold_use_skiplist, huge_page_tlb_size,
|
|
bucket_entries_logging_threshold, if_log_bucket_dist_when_flash);
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
#endif // ROCKSDB_LITE
|