diff --git a/include/rocksdb/plain_table_factory.h b/include/rocksdb/plain_table_factory.h index 3d26c6e4ea..2355e43d44 100644 --- a/include/rocksdb/plain_table_factory.h +++ b/include/rocksdb/plain_table_factory.h @@ -23,7 +23,9 @@ class TableBuilder; // IndexedTable requires fixed length key, configured as a constructor // parameter of the factory class. Output file format: -// +--------------------------------------------+ <= key1 offset +// +-------------+ +// | version | +// +-------------+------------------------------+ <= key1 offset // | key1 | value_size (4 bytes) | | // +----------------------------------------+ | // | value1 | diff --git a/table/plain_table_builder.cc b/table/plain_table_builder.cc index ed0b4d9884..5a6e41df60 100644 --- a/table/plain_table_builder.cc +++ b/table/plain_table_builder.cc @@ -25,6 +25,10 @@ PlainTableBuilder::PlainTableBuilder(const Options& options, int user_key_size, int key_prefix_len) : options_(options), file_(file), user_key_size_(user_key_size), key_prefix_len_(key_prefix_len) { + std::string version; + PutFixed32(&version, 1 | 0x80000000); + file_->Append(Slice(version)); + offset_ = 4; } PlainTableBuilder::~PlainTableBuilder() { @@ -43,11 +47,11 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) { std::string size; int value_size = value.size(); - PutFixed32(&size, value_size); + PutVarint32(&size, value_size); Slice sizeSlice(size); file_->Append(sizeSlice); file_->Append(value); - offset_ += value_size + 4; + offset_ += value_size + size.length(); num_entries_++; } diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 9d0283b224..e7f48df338 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -5,6 +5,7 @@ #include "table/plain_table_reader.h" #include +#include #include "db/dbformat.h" @@ -23,21 +24,35 @@ #include "util/coding.h" #include "util/hash.h" #include "util/histogram.h" +#include "util/murmurhash.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" + namespace std { template<> struct hash { public: std::size_t operator()(rocksdb::Slice const& s) const { - return rocksdb::Hash(s.data(), s.size(), 397); + return MurmurHash(s.data(), s.size(), 397); + } +}; + +class slice_comparator { +public: + bool operator()(rocksdb::Slice const& s1, rocksdb::Slice const& s2) { + return s1.compare(s2) < 0; } }; } namespace rocksdb { +static uint32_t getBucketId(Slice const& s, size_t prefix_len, + uint32_t num_buckets) { + return MurmurHash(s.data(), prefix_len, 397) % num_buckets; +} + PlainTableReader::PlainTableReader(const EnvOptions& storage_options, uint64_t file_size, int user_key_size, int key_prefix_len, int bloom_bits_per_key, @@ -51,6 +66,8 @@ PlainTableReader::PlainTableReader(const EnvOptions& storage_options, filter_policy_ = nullptr; } hash_table_ = nullptr; + data_start_offset_ = 0; + data_end_offset_ = file_size; } PlainTableReader::~PlainTableReader() { @@ -73,6 +90,10 @@ Status PlainTableReader::Open(const Options& options, double hash_table_ratio) { assert(options.allow_mmap_reads); + if (file_size > 2147483646) { + return Status::NotSupported("File is too large for PlainTableReader!"); + } + PlainTableReader* t = new PlainTableReader(soptions, file_size, user_key_size, key_prefix_len, @@ -101,104 +122,111 @@ Iterator* PlainTableReader::NewIterator(const ReadOptions& options) { } Status PlainTableReader::PopulateIndex(uint64_t file_size) { + // Get mmapped memory to file_data_. + Status s = file_->Read(0, file_size_, &file_data_, nullptr); + if (!s.ok()) { + return s; + } + version_ = DecodeFixed32(file_data_.data()); + version_ ^= 0x80000000; + assert(version_ == 1); + data_start_offset_ = 4; + data_end_offset_ = file_size; + Slice key_slice; Slice key_prefix_slice; Slice key_suffix_slice; Slice value_slice; - Slice tmp_slice; Slice prev_key_prefix_slice; - uint64_t pos = 0; - uint64_t data_offset_for_cur_prefix = 0; - int count_prefix = 0; + uint32_t pos = data_start_offset_; + int key_index_within_prefix = 0; bool first = true; std::string prefix_sub_index; HistogramImpl keys_per_prefix_hist; - std::unordered_map tmp_index; + // Need map to be ordered to make sure sub indexes generated + // are in order. + std::map prefix2map; while (pos < file_size) { - uint64_t key_offset = pos; - pos = Next(pos, &key_slice, &value_slice, &tmp_slice); + uint32_t key_offset = pos; + status_ = Next(pos, &key_slice, &value_slice, pos); key_prefix_slice = Slice(key_slice.data(), key_prefix_len_); if (first || prev_key_prefix_slice != key_prefix_slice) { if (!first) { - if (count_prefix < 8 || key_prefix_len_ == user_key_size_) { - tmp_index[prev_key_prefix_slice] = data_offset_for_cur_prefix; - } else { - tmp_index[prev_key_prefix_slice] = sub_index_.length() - | kSubIndexMask; - PutFixed32(&sub_index_, (count_prefix - 1) / 8 + 1); - sub_index_.append(prefix_sub_index); - } - prefix_sub_index.clear(); - data_offset_for_cur_prefix = key_offset; - keys_per_prefix_hist.Add(count_prefix); + keys_per_prefix_hist.Add(key_index_within_prefix); } + key_index_within_prefix = 0; prev_key_prefix_slice = key_prefix_slice; - count_prefix = 1; - } else { - count_prefix++; - } - if (key_prefix_len_ < user_key_size_ && count_prefix % 8 == 1) { - prefix_sub_index.append(key_slice.data() + key_prefix_len_, - user_key_size_ - key_prefix_len_); - PutFixed64(&prefix_sub_index, key_offset); } + if (key_index_within_prefix++ % 8 == 0) { + // Add an index key for every 8 keys + std::string& prefix_index = prefix2map[key_prefix_slice]; + PutFixed32(&prefix_index, key_offset); + } first = false; } - keys_per_prefix_hist.Add(count_prefix); - if (count_prefix <= 2 || key_prefix_len_ == user_key_size_) { - tmp_index[prev_key_prefix_slice] = data_offset_for_cur_prefix; - } else { - tmp_index[prev_key_prefix_slice] = sub_index_.length() | kSubIndexMask; - PutFixed32(&sub_index_, (count_prefix - 1) / 8 + 1); - sub_index_.append(prefix_sub_index); - } - + keys_per_prefix_hist.Add(key_index_within_prefix); if (hash_table_ != nullptr) { delete[] hash_table_; } - // Make the hash table 3/5 full std::vector filter_entries(0); // for creating bloom filter; if (filter_policy_ != nullptr) { - filter_entries.reserve(tmp_index.size()); + filter_entries.reserve(prefix2map.size()); } double hash_table_size_multipier = (hash_table_ratio_ > 1.0) ? 1.0 : 1.0 / hash_table_ratio_; - hash_table_size_ = tmp_index.size() * hash_table_size_multipier + 1; - hash_table_ = new char[GetHashTableRecordLen() * hash_table_size_]; - for (int i = 0; i < hash_table_size_; i++) { - memcpy(GetHashTableBucketPtr(i) + key_prefix_len_, &file_size_, - kOffsetLen); + hash_table_size_ = prefix2map.size() * hash_table_size_multipier + 1; + hash_table_ = new uint32_t[hash_table_size_]; + std::vector hash2map(hash_table_size_); + + size_t sub_index_size_needed = 0; + for (auto& p: prefix2map) { + auto& sub_index = hash2map[getBucketId(p.first, key_prefix_len_, + hash_table_size_)]; + if (sub_index.length() > 0 || p.second.length() > kOffsetLen) { + if (sub_index.length() <= kOffsetLen) { + sub_index_size_needed += sub_index.length() + 4; + } + sub_index_size_needed += p.second.length(); + } + sub_index.append(p.second); + if (filter_policy_ != nullptr) { + filter_entries.push_back(p.first); + } } - for (auto it = tmp_index.begin(); it != tmp_index.end(); ++it) { - if (filter_policy_ != nullptr) { - filter_entries.push_back(it->first); + sub_index_.clear(); + Log(options_.info_log, "Reserving %zu bytes for sub index", + sub_index_size_needed); + sub_index_.reserve(sub_index_size_needed); + for (int i = 0; i < hash_table_size_; i++) { + uint32_t num_keys_for_bucket = hash2map[i].length() / kOffsetLen; + switch (num_keys_for_bucket) { + case 0: + // No key for bucket + hash_table_[i] = data_end_offset_; + break; + case 1: + // point directly to the file offset + hash_table_[i] = DecodeFixed32(hash2map[i].data()); + break; + default: + // point to index block + hash_table_[i] = sub_index_.length() | kSubIndexMask; + PutFixed32(&sub_index_, num_keys_for_bucket); + sub_index_.append(hash2map[i]); } - - int bucket = GetHashTableBucket(it->first); - uint64_t* hash_value; - while (true) { - GetHashValue(bucket, &hash_value); - if (*hash_value == file_size_) { - break; - } - bucket = (bucket + 1) % hash_table_size_; - } - - char* bucket_ptr = GetHashTableBucketPtr(bucket); - memcpy(bucket_ptr, it->first.data(), key_prefix_len_); - memcpy(bucket_ptr + key_prefix_len_, &it->second, kOffsetLen); } if (filter_policy_ != nullptr) { + filter_str_.clear(); filter_policy_->CreateFilter(&filter_entries[0], filter_entries.size(), &filter_str_); filter_slice_ = Slice(filter_str_.data(), filter_str_.size()); } - Log(options_.info_log, "Number of prefixes: %d, suffix_map length %ld", + Log(options_.info_log, "hash table size: %d, suffix_map length %zu", hash_table_size_, sub_index_.length()); Log(options_.info_log, "Number of Keys per prefix Histogram: %s", keys_per_prefix_hist.ToString().c_str()); @@ -206,51 +234,33 @@ Status PlainTableReader::PopulateIndex(uint64_t file_size) { return Status::OK(); } -inline int PlainTableReader::GetHashTableBucket(Slice key) { - return rocksdb::Hash(key.data(), key_prefix_len_, 397) % hash_table_size_; -} - -inline void PlainTableReader::GetHashValue(int bucket, uint64_t** ret_value) { - *ret_value = (uint64_t*) (GetHashTableBucketPtr(bucket) + key_prefix_len_); -} - -Status PlainTableReader::GetOffset(const Slice& target, uint64_t* offset) { - Status s; - int bucket = GetHashTableBucket(target); - uint64_t* found_value; - Slice hash_key; - while (true) { - GetHashValue(bucket, &found_value); - if (*found_value == file_size_) { - break; - } - GetHashKey(bucket, &hash_key); - if (target.starts_with(hash_key)) { - break; - } - bucket = (bucket + 1) % hash_table_size_; - } - - if (*found_value == file_size_ || (*found_value & kSubIndexMask) == 0) { - *offset = *found_value; - return Status::OK(); +uint32_t PlainTableReader::GetOffset(const Slice& target, + bool& prefix_matched) { + prefix_matched = false; + int bucket = getBucketId(target, key_prefix_len_, hash_table_size_); + uint32_t bucket_value = hash_table_[bucket]; + if (bucket_value == data_end_offset_) { + return data_end_offset_; + } else if ((bucket_value & kSubIndexMask) == 0) { + // point directly to the file + return bucket_value; } + // point to sub-index, need to do a binary search uint32_t low = 0; - uint64_t prefix_index_offset = *found_value ^ kSubIndexMask; - uint32_t high = DecodeFixed32(sub_index_.data() + prefix_index_offset); + uint64_t prefix_index_offset = bucket_value ^ kSubIndexMask; + uint32_t upper_bound = DecodeFixed32(sub_index_.data() + prefix_index_offset); + uint32_t high = upper_bound; uint64_t base_offset = prefix_index_offset + 4; - char* mid_key_str = new char[target.size()]; - memcpy(mid_key_str, target.data(), target.size()); - Slice mid_key = Slice(mid_key_str, target.size()); + Slice mid_key; - // The key is between (low, high). Do a binary search between it. + // The key is between [low, high). Do a binary search between it. while (high - low > 1) { uint32_t mid = (high + low) / 2; - const char* base = sub_index_.data() + base_offset - + (user_key_size_ - key_prefix_len_ + kOffsetLen) * mid; - memcpy(mid_key_str + key_prefix_len_, base, - user_key_size_ - key_prefix_len_); + const char* index_offset = sub_index_.data() + base_offset + + kOffsetLen * mid; + uint32_t file_offset = DecodeFixed32(index_offset); + mid_key = Slice(file_data_.data() + file_offset, user_key_size_); int cmp_result = options_.comparator->Compare(target, mid_key); if (cmp_result > 0) { @@ -259,21 +269,32 @@ Status PlainTableReader::GetOffset(const Slice& target, uint64_t* offset) { if (cmp_result == 0) { // Happen to have found the exact key or target is smaller than the // first key after base_offset. - *offset = DecodeFixed64(base + user_key_size_ - key_prefix_len_); - delete[] mid_key_str; - return s; + prefix_matched = true; + return file_offset; } else { high = mid; } } } - const char* base = sub_index_.data() + base_offset - + (user_key_size_ - key_prefix_len_ + kOffsetLen) * low; - *offset = DecodeFixed64(base + user_key_size_ - key_prefix_len_); - - delete[] mid_key_str; - return s; + // The key is between low and low+1 (if exists). Both of them can have the + // correct prefix. Need to rule out at least one, to avoid to miss the + // correct one. + uint32_t low_key_offset = DecodeFixed32( + sub_index_.data() + base_offset + kOffsetLen * low); + if (low + 1 < upper_bound) { + if (Slice(file_data_.data() + low_key_offset, key_prefix_len_) + == Slice(target.data(), key_prefix_len_)) { + prefix_matched = true; + } else { + prefix_matched = false; + return DecodeFixed32( + sub_index_.data() + base_offset + kOffsetLen * (low + 1)); + } + } else { + prefix_matched = false; + } + return low_key_offset; } bool PlainTableReader::MayHavePrefix(const Slice& target_prefix) { @@ -282,46 +303,74 @@ bool PlainTableReader::MayHavePrefix(const Slice& target_prefix) { } -uint64_t PlainTableReader::Next(uint64_t offset, Slice* key, Slice* value, - Slice* tmp_slice) { - if (offset >= file_size_) { - return file_size_; +Status PlainTableReader::Next(uint32_t offset, Slice* key, Slice* value, + uint32_t& next_offset) { + if (offset == data_end_offset_) { + next_offset = data_end_offset_; + return Status::OK(); } + + if (offset > data_end_offset_) { + return Status::Corruption("Offset is out of file size"); + } + int internal_key_size = GetInternalKeyLength(); + if (offset + internal_key_size >= data_end_offset_) { + return Status::Corruption("Un able to read the next key"); + } - Status s = file_->Read(offset, internal_key_size, key, nullptr); - offset += internal_key_size; + const char* key_ptr = file_data_.data() + offset; + *key = Slice(key_ptr, internal_key_size); - s = file_->Read(offset, 4, tmp_slice, nullptr); - offset += 4; - uint32_t value_size = DecodeFixed32(tmp_slice->data()); + uint32_t value_size; + const char* value_ptr = GetVarint32Ptr(key_ptr + internal_key_size, + file_data_.data() + data_end_offset_, + &value_size); + if (value_ptr == nullptr) { + return Status::Corruption("Error reading value length."); + } + next_offset = offset + (value_ptr - key_ptr) + value_size; + if (next_offset > data_end_offset_) { + return Status::Corruption("Reach end of file when reading value"); + } + *value = Slice(value_ptr, value_size); - s = file_->Read(offset, value_size, value, nullptr); - offset += value_size; - - return offset; + return Status::OK(); } Status PlainTableReader::Get( const ReadOptions& ro, const Slice& target, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), void (*mark_key_may_exist)(void*)) { - uint64_t offset; - Status s = GetOffset(target, &offset); - if (!s.ok()) { - return s; + // Check bloom filter first. + if (!MayHavePrefix(Slice(target.data(), key_prefix_len_))) { + return Status::OK(); } + + uint32_t offset; + bool prefix_match; + offset = GetOffset(target, prefix_match); Slice found_key; Slice found_value; - Slice tmp_slice; - while (offset < file_size_) { - offset = Next(offset, &found_key, &found_value, &tmp_slice); + while (offset < data_end_offset_) { + Status s = Next(offset, &found_key, &found_value, offset); + if (!s.ok()) { + return s; + } + if (!prefix_match) { + // Need to verify prefix for the first key found if it is not yet + // checked. + if (!target.starts_with(Slice(found_key.data(), key_prefix_len_))) { + break; + } + prefix_match = true; + } if (options_.comparator->Compare(found_key, target) >= 0 && !(*saver)(arg, found_key, found_value, true)) { break; } } - return s; + return Status::OK(); } bool PlainTableReader::TEST_KeyInCache(const ReadOptions& options, @@ -342,11 +391,12 @@ PlainTableIterator::~PlainTableIterator() { } bool PlainTableIterator::Valid() const { - return offset_ < table_->file_size_ && offset_ >= 0; + return offset_ < table_->data_end_offset_ + && offset_ >= table_->data_start_offset_; } void PlainTableIterator::SeekToFirst() { - next_offset_ = 0; + next_offset_ = table_->data_start_offset_; Next(); } @@ -356,26 +406,35 @@ void PlainTableIterator::SeekToLast() { void PlainTableIterator::Seek(const Slice& target) { if (!table_->MayHavePrefix(Slice(target.data(), table_->key_prefix_len_))) { - offset_ = next_offset_ = table_->file_size_; + offset_ = next_offset_ = table_->data_end_offset_; return; } + bool prefix_match; + next_offset_ = table_->GetOffset(target, prefix_match); - Status s = table_->GetOffset(target, &next_offset_); - if (!s.ok()) { - status_ = s; - } - if (next_offset_ < table_->file_size_) { - for (Next(); - Valid() && table_->options_.comparator->Compare(key(), target) < 0; - Next()) { + if (next_offset_ < table_-> data_end_offset_) { + for (Next(); status_.ok() && Valid(); Next()) { + if (!prefix_match) { + // Need to verify the first key's prefix + if (!target.starts_with(Slice(key().data(), table_->key_prefix_len_))) { + offset_ = next_offset_ = table_->data_end_offset_; + break; + } + prefix_match = true; + } + if (table_->options_.comparator->Compare(key(), target) >= 0) { + break; + } } + } else { + offset_ = table_->data_end_offset_; } } void PlainTableIterator::Next() { offset_ = next_offset_; Slice tmp_slice; - next_offset_ = table_->Next(next_offset_, &key_, &value_, &tmp_slice); + status_ = table_->Next(next_offset_, &key_, &value_, next_offset_); } void PlainTableIterator::Prev() { diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index d9ac343266..eea8adfe6b 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -25,7 +25,9 @@ using std::unique_ptr; using std::unordered_map; // Based on following output file format: -// +--------------------------------------------+ <= key1_data_offset +// +-------------+ +// | version | +// +-------------+------------------------------+ <= key1_data_offset // | key1 | value_size (4 bytes) | | // +----------------------------------------+ | // | value1 | @@ -85,7 +87,7 @@ public: ~PlainTableReader(); private: - char* hash_table_; + uint32_t* hash_table_; int hash_table_size_; std::string sub_index_; @@ -94,7 +96,11 @@ private: Status status_; unique_ptr file_; - uint64_t file_size_; + Slice file_data_; + uint32_t version_; + uint32_t file_size_; + uint32_t data_start_offset_; + uint32_t data_end_offset_; const size_t user_key_size_; const size_t key_prefix_len_; const double hash_table_ratio_; @@ -105,32 +111,34 @@ private: TableProperties tbl_props; static const size_t kNumInternalBytes = 8; - static const uint64_t kSubIndexMask = 0x8000000000000000; - static const size_t kOffsetLen = sizeof(uint64_t); + static const uint32_t kSubIndexMask = 0x80000000; + static const size_t kOffsetLen = sizeof(uint32_t); - inline int GetHashTableBucket(Slice key); inline size_t GetInternalKeyLength() { return user_key_size_ + kNumInternalBytes; } - inline size_t GetHashTableRecordLen() { - return key_prefix_len_ + kOffsetLen; - } - inline char* GetHashTableBucketPtr(int bucket) { - return hash_table_ + GetHashTableRecordLen() * bucket; - } - inline void GetHashKey(int bucket, Slice* slice) { - *slice = Slice(GetHashTableBucketPtr(bucket), key_prefix_len_); - } - inline void GetHashValue(int bucket, uint64_t** ret_value); friend class TableCache; friend class PlainTableIterator; + // Populate the internal indexes. It must be called before + // any query to the table. + // This query will populate the hash table hash_table_, the second + // level of indexes sub_index_ and bloom filter filter_slice_ if enabled. Status PopulateIndex(uint64_t file_size); - uint64_t Next(uint64_t offset, Slice* key, Slice* value, Slice* tmp_slice); - Status GetOffset(const Slice& target, uint64_t* offset); + + // Check bloom filter to see whether it might contain this prefix bool MayHavePrefix(const Slice& target_prefix); + // Read the key and value at offset to key and value. + // tmp_slice is a tmp slice. + // return next_offset as the offset for the next key. + Status Next(uint32_t offset, Slice* key, Slice* value, uint32_t& next_offset); + // Get file offset for key target. + // return value prefix_matched is set to true if the offset is confirmed + // for a key with the same prefix as target. + uint32_t GetOffset(const Slice& target, bool& prefix_matched); + // No copying allowed explicit PlainTableReader(const TableReader&) = delete; void operator=(const TableReader&) = delete; @@ -162,8 +170,8 @@ public: private: PlainTableReader* table_; - uint64_t offset_; - uint64_t next_offset_; + uint32_t offset_; + uint32_t next_offset_; Slice key_; Slice value_; Status status_;