From 3c6800610995c5eee3c04254e9f1d4cbef9e96a0 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Thu, 25 Sep 2014 11:14:01 -0700 Subject: [PATCH] CompactedDBImpl Summary: Add a CompactedDBImpl that will enabled when calling OpenForReadOnly() and the DB only has one level (>0) of files. As a performan comparison, CuckooTable performs 2.1M/s with CompactedDBImpl vs. 1.78M/s with ReadOnlyDBImpl. Test Plan: db_bench Reviewers: yhchiang, igor, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D23553 --- db/db_bench.cc | 40 ++++++ db/db_impl.h | 1 + db/db_impl_readonly.cc | 46 ++----- db/db_impl_readonly.h | 13 -- db/db_test.cc | 74 +++++++++++ db/version_set.cc | 61 +++------ db/version_set.h | 23 ++++ table/cuckoo_table_reader.cc | 4 +- utilities/compacted_db/compacted_db_impl.cc | 132 ++++++++++++++++++++ utilities/compacted_db/compacted_db_impl.h | 92 ++++++++++++++ 10 files changed, 395 insertions(+), 91 deletions(-) create mode 100644 utilities/compacted_db/compacted_db_impl.cc create mode 100644 utilities/compacted_db/compacted_db_impl.h diff --git a/db/db_bench.cc b/db/db_bench.cc index d90c628a9f..926d8de69c 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1262,6 +1262,8 @@ class Benchmark { method = &Benchmark::ReadReverse; } else if (name == Slice("readrandom")) { method = &Benchmark::ReadRandom; + } else if (name == Slice("readrandomfast")) { + method = &Benchmark::ReadRandomFast; } else if (name == Slice("multireadrandom")) { method = &Benchmark::MultiReadRandom; } else if (name == Slice("readmissing")) { @@ -2071,6 +2073,44 @@ class Benchmark { thread->stats.AddBytes(bytes); } + void ReadRandomFast(ThreadState* thread) { + int64_t read = 0; + int64_t found = 0; + ReadOptions options(FLAGS_verify_checksum, true); + Slice key = AllocateKey(); + std::unique_ptr key_guard(key.data()); + std::string value; + DB* db = SelectDBWithCfh(thread)->db; + + int64_t pot = 1; + while (pot < FLAGS_num) { + pot <<= 1; + } + + Duration duration(FLAGS_duration, reads_); + do { + for (int i = 0; i < 100; ++i) { + int64_t key_rand = thread->rand.Next() & (pot - 1); + GenerateKeyFromInt(key_rand, FLAGS_num, &key); + ++read; + if (db->Get(options, key, &value).ok()) { + ++found; + } + } + thread->stats.FinishedOps(db, 100); + } while (!duration.Done(100)); + + char msg[100]; + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", + found, read); + + thread->stats.AddMessage(msg); + + if (FLAGS_perf_level > 0) { + thread->stats.AddMessage(perf_context.ToString()); + } + } + void ReadRandom(ThreadState* thread) { int64_t read = 0; int64_t found = 0; diff --git a/db/db_impl.h b/db/db_impl.h index 0bc2018b42..c6baf9c95d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -325,6 +325,7 @@ class DBImpl : public DB { friend class ForwardIterator; #endif friend struct SuperVersion; + friend class CompactedDBImpl; struct CompactionState; struct WriteContext; diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 8cea58736f..98e2bfeb0e 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -2,42 +2,12 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -// -// Copyright (c) 2012 Facebook. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. #include "db/db_impl_readonly.h" +#include "utilities/compacted_db/compacted_db_impl.h" #include "db/db_impl.h" - -#include -#include -#include -#include -#include -#include -#include "db/db_iter.h" -#include "db/dbformat.h" -#include "db/filename.h" -#include "db/log_reader.h" -#include "db/log_writer.h" -#include "db/memtable.h" #include "db/merge_context.h" -#include "db/table_cache.h" -#include "db/version_set.h" -#include "db/write_batch_internal.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "rocksdb/status.h" -#include "rocksdb/table.h" -#include "rocksdb/merge_operator.h" -#include "port/port.h" -#include "table/block.h" -#include "table/merger.h" -#include "table/two_level_iterator.h" -#include "util/coding.h" -#include "util/logging.h" -#include "util/build_version.h" +#include "db/db_iter.h" namespace rocksdb { @@ -120,6 +90,15 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, DB** dbptr, bool error_if_log_file_exist) { *dbptr = nullptr; + // Try to first open DB as fully compacted DB + Status s; +#ifndef ROCKSDB_LITE + s = CompactedDBImpl::Open(options, dbname, dbptr); + if (s.ok()) { + return s; + } +#endif + DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; @@ -127,8 +106,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); std::vector handles; - Status s = - DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr); + s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr); if (s.ok()) { assert(handles.size() == 1); // i can delete the handle since DBImpl is always holding a diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 1dfdf422ef..9b10b83fbb 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -2,24 +2,11 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -// -// Copyright (c) 2012 Facebook. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. #pragma once #include "db/db_impl.h" - -#include -#include #include #include -#include "db/dbformat.h" -#include "db/log_writer.h" -#include "db/snapshot.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "port/port.h" namespace rocksdb { diff --git a/db/db_test.cc b/db/db_test.cc index 7ad249d7fc..09e59f46c0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1270,6 +1270,80 @@ TEST(DBTest, ReadOnlyDB) { ASSERT_EQ("v2", Get("bar")); } +TEST(DBTest, CompactedDB) { + const uint64_t kFileSize = 1 << 20; + Options options; + options.disable_auto_compactions = true; + options.max_mem_compaction_level = 0; + options.write_buffer_size = kFileSize; + options.target_file_size_base = kFileSize; + options.max_bytes_for_level_base = 1 << 30; + options.compression = kNoCompression; + Reopen(&options); + // 1 L0 file, use CompactedDB if max_open_files = -1 + ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1'))); + Flush(); + Close(); + ASSERT_OK(ReadOnlyReopen(&options)); + Status s = Put("new", "value"); + ASSERT_EQ(s.ToString(), + "Not implemented: Not supported operation in read only mode."); + ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa")); + Close(); + options.max_open_files = -1; + ASSERT_OK(ReadOnlyReopen(&options)); + s = Put("new", "value"); + ASSERT_EQ(s.ToString(), + "Not implemented: Not supported in compacted db mode."); + ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa")); + Close(); + Reopen(&options); + // Add more L0 files + ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2'))); + Flush(); + ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a'))); + Flush(); + ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b'))); + Flush(); + Close(); + + ASSERT_OK(ReadOnlyReopen(&options)); + // Fallback to read-only DB + s = Put("new", "value"); + ASSERT_EQ(s.ToString(), + "Not implemented: Not supported operation in read only mode."); + Close(); + + // Full compaction + Reopen(&options); + // Add more keys + ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e'))); + ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f'))); + ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h'))); + ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i'))); + ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j'))); + db_->CompactRange(nullptr, nullptr); + ASSERT_EQ(3, NumTableFilesAtLevel(1)); + Close(); + + // CompactedDB + ASSERT_OK(ReadOnlyReopen(&options)); + s = Put("new", "value"); + ASSERT_EQ(s.ToString(), + "Not implemented: Not supported in compacted db mode."); + ASSERT_EQ("NOT_FOUND", Get("abc")); + ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa")); + ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb")); + ASSERT_EQ("NOT_FOUND", Get("ccc")); + ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee")); + ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff")); + ASSERT_EQ("NOT_FOUND", Get("ggg")); + ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh")); + ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii")); + ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj")); + ASSERT_EQ("NOT_FOUND", Get("kkk")); +} + // Make sure that when options.block_cache is set, after a new table is // created its index/filter blocks are added to block cache. TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) { diff --git a/db/version_set.cc b/db/version_set.cc index 7edfaa788e..0a46d7edc1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -626,46 +626,23 @@ void Version::AddIterators(const ReadOptions& read_options, } } -// Callback from TableCache::Get() -enum SaverState { - kNotFound, - kFound, - kDeleted, - kCorrupt, - kMerge // saver contains the current merge result (the operands) -}; - -namespace version_set { -struct Saver { - SaverState state; - const Comparator* ucmp; - Slice user_key; - bool* value_found; // Is value set correctly? Used by KeyMayExist - std::string* value; - const MergeOperator* merge_operator; - // the merge operations encountered; - MergeContext* merge_context; - Logger* logger; - Statistics* statistics; -}; -} // namespace version_set // Called from TableCache::Get and Table::Get when file/block in which // key may exist are not there in TableCache/BlockCache respectively. In this // case we can't guarantee that key does not exist and are not permitted to do // IO to be certain.Set the status=kFound and value_found=false to let the // caller know that key may exist but is not there in memory -static void MarkKeyMayExist(void* arg) { - version_set::Saver* s = reinterpret_cast(arg); - s->state = kFound; +void MarkKeyMayExist(void* arg) { + Version::Saver* s = reinterpret_cast(arg); + s->state = Version::kFound; if (s->value_found != nullptr) { *(s->value_found) = false; } } -static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, - const Slice& v) { - version_set::Saver* s = reinterpret_cast(arg); +bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, + const Slice& v) { + Version::Saver* s = reinterpret_cast(arg); MergeContext* merge_contex = s->merge_context; std::string merge_result; // temporary area for merge results later @@ -676,17 +653,17 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, // Key matches. Process it switch (parsed_key.type) { case kTypeValue: - if (kNotFound == s->state) { - s->state = kFound; + if (Version::kNotFound == s->state) { + s->state = Version::kFound; s->value->assign(v.data(), v.size()); - } else if (kMerge == s->state) { + } else if (Version::kMerge == s->state) { assert(s->merge_operator != nullptr); - s->state = kFound; + s->state = Version::kFound; if (!s->merge_operator->FullMerge(s->user_key, &v, merge_contex->GetOperands(), s->value, s->logger)) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); - s->state = kCorrupt; + s->state = Version::kCorrupt; } } else { assert(false); @@ -694,15 +671,15 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, return false; case kTypeDeletion: - if (kNotFound == s->state) { - s->state = kDeleted; - } else if (kMerge == s->state) { - s->state = kFound; + if (Version::kNotFound == s->state) { + s->state = Version::kDeleted; + } else if (Version::kMerge == s->state) { + s->state = Version::kFound; if (!s->merge_operator->FullMerge(s->user_key, nullptr, merge_contex->GetOperands(), s->value, s->logger)) { RecordTick(s->statistics, NUMBER_MERGE_FAILURES); - s->state = kCorrupt; + s->state = Version::kCorrupt; } } else { assert(false); @@ -710,8 +687,8 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, return false; case kTypeMerge: - assert(s->state == kNotFound || s->state == kMerge); - s->state = kMerge; + assert(s->state == Version::kNotFound || s->state == Version::kMerge); + s->state = Version::kMerge; merge_contex->PushOperand(v); return true; @@ -779,7 +756,7 @@ void Version::Get(const ReadOptions& options, Slice user_key = k.user_key(); assert(status->ok() || status->IsMergeInProgress()); - version_set::Saver saver; + Saver saver; saver.state = status->ok()? kNotFound : kMerge; saver.ucmp = user_comparator_; saver.user_key = user_key; diff --git a/db/version_set.h b/db/version_set.h index 211fca1795..9e6cc1e34d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -241,10 +241,33 @@ class Version { FileMetaData* file; }; + enum SaverState { + kNotFound, + kFound, + kDeleted, + kCorrupt, + kMerge // saver contains the current merge result (the operands) + }; + + // Callback from TableCache::Get() + struct Saver { + SaverState state; + const Comparator* ucmp; + Slice user_key; + bool* value_found; // Is value set correctly? Used by KeyMayExist + std::string* value; + const MergeOperator* merge_operator; + // the merge operations encountered; + MergeContext* merge_context; + Logger* logger; + Statistics* statistics; + }; + private: friend class Compaction; friend class VersionSet; friend class DBImpl; + friend class CompactedDBImpl; friend class ColumnFamilyData; friend class CompactionPicker; friend class LevelCompactionPicker; diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index 63b8a2c8c1..b8ac5a47ea 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -120,9 +120,9 @@ Status CuckooTableReader::Get( get_slice_hash_); const char* bucket = &file_data_.data()[offset]; for (uint32_t block_idx = 0; block_idx < cuckoo_block_size_; - ++block_idx, bucket += bucket_length_) { + ++block_idx, bucket += bucket_length_) { if (ucomp_->Compare(Slice(unused_key_.data(), user_key.size()), - Slice(bucket, user_key.size())) == 0) { + Slice(bucket, user_key.size())) == 0) { return Status::OK(); } // Here, we compare only the user key part as we support only one entry diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc new file mode 100644 index 0000000000..07dc71ea94 --- /dev/null +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -0,0 +1,132 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#include "utilities/compacted_db/compacted_db_impl.h" +#include "db/db_impl.h" +#include "db/version_set.h" +#include "db/merge_context.h" + +namespace rocksdb { + +extern void MarkKeyMayExist(void* arg); +extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, + const Slice& v); + +CompactedDBImpl::CompactedDBImpl( + const DBOptions& options, const std::string& dbname) + : DBImpl(options, dbname) { +} + +CompactedDBImpl::~CompactedDBImpl() { +} + +Status CompactedDBImpl::Get(const ReadOptions& options, + ColumnFamilyHandle*, const Slice& key, std::string* value) { + size_t left = 0; + size_t right = files_.num_files - 1; + while (left < right) { + size_t mid = (left + right) >> 1; + const FdWithKeyRange& f = files_.files[mid]; + if (user_comparator_->Compare(ExtractUserKey(f.largest_key), key) < 0) { + // Key at "mid.largest" is < "target". Therefore all + // files at or before "mid" are uninteresting. + left = mid + 1; + } else { + // Key at "mid.largest" is >= "target". Therefore all files + // after "mid" are uninteresting. + right = mid; + } + } + const FdWithKeyRange& f = files_.files[right]; + + bool value_found; + MergeContext merge_context; + Version::Saver saver; + saver.state = Version::kNotFound; + saver.ucmp = user_comparator_; + saver.user_key = key; + saver.value_found = &value_found; + saver.value = value; + saver.merge_operator = nullptr; + saver.merge_context = &merge_context; + saver.logger = info_log_; + saver.statistics = statistics_; + LookupKey lkey(key, kMaxSequenceNumber); + f.fd.table_reader->Get(options, lkey.internal_key(), + reinterpret_cast(&saver), SaveValue, + MarkKeyMayExist); + if (saver.state == Version::kFound) { + return Status::OK(); + } + return Status::NotFound(); +} + +Status CompactedDBImpl::Init(const Options& options) { + mutex_.Lock(); + ColumnFamilyDescriptor cf(kDefaultColumnFamilyName, + ColumnFamilyOptions(options)); + Status s = Recover({ cf }, true /* read only */, false); + if (s.ok()) { + cfd_ = reinterpret_cast( + DefaultColumnFamily())->cfd(); + delete cfd_->InstallSuperVersion(new SuperVersion(), &mutex_); + } + mutex_.Unlock(); + if (!s.ok()) { + return s; + } + version_ = cfd_->GetSuperVersion()->current; + user_comparator_ = cfd_->user_comparator(); + statistics_ = cfd_->ioptions()->statistics; + info_log_ = cfd_->ioptions()->info_log; + // L0 should not have files + if (version_->file_levels_[0].num_files > 1) { + return Status::NotSupported("L0 contain more than 1 file"); + } + if (version_->file_levels_[0].num_files == 1) { + if (version_->num_non_empty_levels_ > 1) { + return Status::NotSupported("Both L0 and other level contain files"); + } + files_ = version_->file_levels_[0]; + return Status::OK(); + } + + for (int i = 1; i < version_->num_non_empty_levels_ - 1; ++i) { + if (version_->file_levels_[i].num_files > 0) { + return Status::NotSupported("Other levels also contain files"); + } + } + + int level = version_->num_non_empty_levels_ - 1; + if (version_->file_levels_[level].num_files > 0) { + files_ = version_->file_levels_[version_->num_non_empty_levels_ - 1]; + return Status::OK(); + } + return Status::NotSupported("no file exists"); +} + +Status CompactedDBImpl::Open(const Options& options, + const std::string& dbname, DB** dbptr) { + *dbptr = nullptr; + + if (options.max_open_files != -1) { + return Status::InvalidArgument("require max_open_files = -1"); + } + if (options.merge_operator.get() != nullptr) { + return Status::InvalidArgument("merge operator is not supported"); + } + DBOptions db_options(options); + std::unique_ptr db(new CompactedDBImpl(db_options, dbname)); + Status s = db->Init(options); + if (s.ok()) { + *dbptr = db.release(); + Log(options.info_log, "Opened the db as fully compacted mode"); + } + return s; +} + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/compacted_db/compacted_db_impl.h b/utilities/compacted_db/compacted_db_impl.h new file mode 100644 index 0000000000..8237a2cdd6 --- /dev/null +++ b/utilities/compacted_db/compacted_db_impl.h @@ -0,0 +1,92 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE +#include "db/db_impl.h" +#include +#include + +namespace rocksdb { + +class CompactedDBImpl : public DBImpl { + public: + CompactedDBImpl(const DBOptions& options, const std::string& dbname); + virtual ~CompactedDBImpl(); + + static Status Open(const Options& options, const std::string& dbname, + DB** dbptr); + + // Implementations of the DB interface + using DB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) override; + + using DBImpl::Put; + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + using DBImpl::Merge; + virtual Status Merge(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + using DBImpl::Delete; + virtual Status Delete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + virtual Status Write(const WriteOptions& options, + WriteBatch* updates) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + using DBImpl::CompactRange; + virtual Status CompactRange(ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end, + bool reduce_level = false, int target_level = -1, + uint32_t target_path_id = 0) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + + virtual Status DisableFileDeletions() override { + return Status::NotSupported("Not supported in compacted db mode."); + } + virtual Status EnableFileDeletions(bool force) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + virtual Status GetLiveFiles(std::vector&, + uint64_t* manifest_file_size, + bool flush_memtable = true) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + using DBImpl::Flush; + virtual Status Flush(const FlushOptions& options, + ColumnFamilyHandle* column_family) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + + private: + friend class DB; + Status Init(const Options& options); + + ColumnFamilyData* cfd_; + Version* version_; + const Comparator* user_comparator_; + FileLevel files_; + + Statistics* statistics_; + Logger* info_log_; + + // No copying allowed + CompactedDBImpl(const CompactedDBImpl&); + void operator=(const CompactedDBImpl&); +}; +} +#endif // ROCKSDB_LITE