From 782a1590f9d2eb97dd9c1f496e05e5bedb41f3c1 Mon Sep 17 00:00:00 2001 From: Giuseppe Ottaviano Date: Tue, 23 Jun 2015 10:25:45 -0700 Subject: [PATCH] Implement a table-level row cache Summary: Implementation of a table-level row cache. It only caches point queries done through the `DB::Get` interface, queries done through the `Iterator` interface will completely skip the cache. Supports snapshots and merge operations. Test Plan: Ran `make valgrind_check commit-prereq` Reviewers: igor, philipp, sdong Reviewed By: sdong Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D39849 --- HISTORY.md | 1 + db/db_bench.cc | 12 ++++ db/db_impl.cc | 1 - db/db_test.cc | 26 ++++++++- db/table_cache.cc | 90 ++++++++++++++++++++++++++--- db/table_cache.h | 1 + include/rocksdb/immutable_options.h | 2 + include/rocksdb/options.h | 5 ++ include/rocksdb/statistics.h | 7 +++ table/get_context.cc | 45 ++++++++++++++- table/get_context.h | 9 +++ util/options.cc | 13 ++++- 12 files changed, 199 insertions(+), 13 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 980cd3db15..aa139b42bb 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### New Features * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. * Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds) +* Added a cache for individual rows. See DBOptions::row_cache for more info. ### Public API changes * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. diff --git a/db/db_bench.cc b/db/db_bench.cc index 296a90c69c..9459b3460f 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -322,6 +322,10 @@ DEFINE_int32(block_restart_interval, DEFINE_int64(compressed_cache_size, -1, "Number of bytes to use as a cache of compressed data."); +DEFINE_int64(row_cache_size, 0, + "Number of bytes to use as a cache of individual rows" + " (0 = disabled)."); + DEFINE_int32(open_files, rocksdb::Options().max_open_files, "Maximum number of files to keep open at the same time" " (use default if == 0)"); @@ -2268,6 +2272,14 @@ class Benchmark { options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; options.filter_deletes = FLAGS_filter_deletes; + if (FLAGS_row_cache_size) { + if (FLAGS_cache_numshardbits >= 1) { + options.row_cache = + NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits); + } else { + options.row_cache = NewLRUCache(FLAGS_row_cache_size); + } + } if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash || FLAGS_rep_factory == kHashLinkedList)) { fprintf(stderr, "prefix_size should be non-zero if PrefixHash or " diff --git a/db/db_impl.cc b/db/db_impl.cc index d4e6482122..e8dee2c76d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -253,7 +253,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) // Give a large number for setting of "infinite" open files. const int table_cache_size = (db_options_.max_open_files == -1) ? 4194304 : db_options_.max_open_files - 10; - // Reserve ten files or so for other uses and give the rest to TableCache. table_cache_ = NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits); diff --git a/db/db_test.cc b/db/db_test.cc index f7f9f4cc17..5224c7ea6b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -417,7 +417,8 @@ class DBTest : public testing::Test { kxxHashChecksum = 25, kFIFOCompaction = 26, kOptimizeFiltersForHits = 27, - kEnd = 28 + kRowCache = 28, + kEnd = 29 }; int option_config_; @@ -707,6 +708,10 @@ class DBTest : public testing::Test { set_block_based_table_factory = true; break; } + case kRowCache: { + options.row_cache = NewLRUCache(1024 * 1024); + break; + } default: break; @@ -14017,6 +14022,25 @@ TEST_F(DBTest, FailWhenCompressionNotSupportedTest) { } } +TEST_F(DBTest, RowCache) { + Options options = CurrentOptions(); + options.statistics = rocksdb::CreateDBStatistics(); + options.row_cache = NewLRUCache(8192); + DestroyAndReopen(options); + + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0); + ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0); + ASSERT_EQ(Get("foo"), "bar"); + ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0); + ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1); + ASSERT_EQ(Get("foo"), "bar"); + ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1); + ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/table_cache.cc b/db/table_cache.cc index e1b0ca8b9b..762029e8a4 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -9,6 +9,7 @@ #include "db/table_cache.h" +#include "db/dbformat.h" #include "db/filename.h" #include "db/version_edit.h" @@ -21,9 +22,12 @@ namespace rocksdb { +namespace { + +template static void DeleteEntry(const Slice& key, void* value) { - TableReader* table_reader = reinterpret_cast(value); - delete table_reader; + T* typed_value = reinterpret_cast(value); + delete typed_value; } static void UnrefEntry(void* arg1, void* arg2) { @@ -37,11 +41,27 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) { sizeof(*file_number)); } +#ifndef ROCKSDB_LITE + +void AppendVarint64(IterKey* key, uint64_t v) { + char buf[10]; + auto ptr = EncodeVarint64(buf, v); + key->TrimAppend(key->Size(), buf, ptr - buf); +} + +#endif // ROCKSDB_LITE + +} // namespace + TableCache::TableCache(const ImmutableCFOptions& ioptions, const EnvOptions& env_options, Cache* const cache) - : ioptions_(ioptions), - env_options_(env_options), - cache_(cache) {} + : ioptions_(ioptions), env_options_(env_options), cache_(cache) { + if (ioptions_.row_cache) { + // If the same cache is shared by multiple instances, we need to + // disambiguate its entries. + PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId()); + } +} TableCache::~TableCache() { } @@ -88,7 +108,8 @@ Status TableCache::FindTable(const EnvOptions& env_options, // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. } else { - *handle = cache_->Insert(key, table_reader.release(), 1, &DeleteEntry); + *handle = cache_->Insert(key, table_reader.release(), 1, + &DeleteEntry); } } return s; @@ -137,6 +158,46 @@ Status TableCache::Get(const ReadOptions& options, TableReader* t = fd.table_reader; Status s; Cache::Handle* handle = nullptr; + std::string* row_cache_entry = nullptr; + +#ifndef ROCKSDB_LITE + IterKey row_cache_key; + std::string row_cache_entry_buffer; + + if (ioptions_.row_cache) { + uint64_t fd_number = fd.GetNumber(); + auto user_key = ExtractUserKey(k); + // We use the user key as cache key instead of the internal key, + // otherwise the whole cache would be invalidated every time the + // sequence key increases. However, to support caching snapshot + // reads, we append the sequence number (incremented by 1 to + // distinguish from 0) only in this case. + uint64_t seq_no = + options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k); + + // Compute row cache key. + row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(), + row_cache_id_.size()); + AppendVarint64(&row_cache_key, fd_number); + AppendVarint64(&row_cache_key, seq_no); + row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(), + user_key.size()); + + if (auto row_handle = ioptions_.row_cache->Lookup(row_cache_key.GetKey())) { + auto found_row_cache_entry = static_cast( + ioptions_.row_cache->Value(row_handle)); + replayGetContextLog(*found_row_cache_entry, user_key, get_context); + ioptions_.row_cache->Release(row_handle); + RecordTick(ioptions_.statistics, ROW_CACHE_HIT); + return Status::OK(); + } + + // Not found, setting up the replay log. + RecordTick(ioptions_.statistics, ROW_CACHE_MISS); + row_cache_entry = &row_cache_entry_buffer; + } +#endif // ROCKSDB_LITE + if (!t) { s = FindTable(env_options_, internal_comparator, fd, &handle, options.read_tier == kBlockCacheTier); @@ -145,15 +206,30 @@ Status TableCache::Get(const ReadOptions& options, } } if (s.ok()) { + get_context->SetReplayLog(row_cache_entry); // nullptr if no cache. s = t->Get(options, k, get_context); + get_context->SetReplayLog(nullptr); if (handle != nullptr) { ReleaseHandle(handle); } } else if (options.read_tier && s.IsIncomplete()) { - // Couldnt find Table in cache but treat as kFound if no_io set + // Couldn't find Table in cache but treat as kFound if no_io set get_context->MarkKeyMayExist(); return Status::OK(); } + +#ifndef ROCKSDB_LITE + // Put the replay log in row cache only if something was found. + if (s.ok() && row_cache_entry && !row_cache_entry->empty()) { + size_t charge = + row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string); + void* row_ptr = new std::string(std::move(*row_cache_entry)); + auto row_handle = ioptions_.row_cache->Insert( + row_cache_key.GetKey(), row_ptr, charge, &DeleteEntry); + ioptions_.row_cache->Release(row_handle); + } +#endif // ROCKSDB_LITE + return s; } diff --git a/db/table_cache.h b/db/table_cache.h index 76bb1c0a2b..5212de717c 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -94,6 +94,7 @@ class TableCache { const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; Cache* const cache_; + std::string row_cache_id_; }; } // namespace rocksdb diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index c5c61d4e4e..fea483cc69 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -98,6 +98,8 @@ struct ImmutableCFOptions { // A vector of EventListeners which call-back functions will be called // when specific RocksDB event happens. std::vector> listeners; + + std::shared_ptr row_cache; }; } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index cd62ddb4e1..7b4bd67638 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1055,6 +1055,11 @@ struct DBOptions { // Recovery mode to control the consistency while replaying WAL // Default: kTolerateCorruptedTailRecords WALRecoveryMode wal_recovery_mode; + + // A global cache for table-level rows. + // Default: nullptr (disabled) + // Not supported in ROCKSDB_LITE mode! + std::shared_ptr row_cache; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 4e06bf6a41..56d01aaf3a 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -141,6 +141,11 @@ enum Tickers : uint32_t { NUMBER_BLOCK_NOT_COMPRESSED, MERGE_OPERATION_TOTAL_TIME, FILTER_OPERATION_TOTAL_TIME, + + // Row cache. + ROW_CACHE_HIT, + ROW_CACHE_MISS, + TICKER_ENUM_MAX }; @@ -209,6 +214,8 @@ const std::vector> TickersNameMap = { {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"}, {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"}, {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, + {ROW_CACHE_HIT, "rocksdb.row.cache.hit"}, + {ROW_CACHE_MISS, "rocksdb.row.cache.miss"}, }; /** diff --git a/table/get_context.cc b/table/get_context.cc index e83aa1d863..5ac3525cd5 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -12,6 +12,24 @@ namespace rocksdb { +namespace { + +void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { +#ifndef ROCKSDB_LITE + if (replay_log) { + if (replay_log->empty()) { + // Optimization: in the common case of only one operation in the + // log, we allocate the exact amount of space needed. + replay_log->reserve(1 + VarintLength(value.size()) + value.size()); + } + replay_log->push_back(type); + PutLengthPrefixedSlice(replay_log, value); + } +#endif // ROCKSDB_LITE +} + +} // namespace + GetContext::GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, @@ -26,7 +44,8 @@ GetContext::GetContext(const Comparator* ucmp, value_(ret_value), value_found_(value_found), merge_context_(merge_context), - env_(env) {} + env_(env), + replay_log_(nullptr) {} // Called from TableCache::Get and Table::Get when file/block in which // key may exist are not there in TableCache/BlockCache respectively. In this @@ -41,6 +60,9 @@ void GetContext::MarkKeyMayExist() { } void GetContext::SaveValue(const Slice& value) { + assert(state_ == kNotFound); + appendToReplayLog(replay_log_, kTypeValue, value); + state_ = kFound; value_->assign(value.data(), value.size()); } @@ -50,6 +72,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert((state_ != kMerge && parsed_key.type != kTypeMerge) || merge_context_ != nullptr); if (ucmp_->Compare(parsed_key.user_key, user_key_) == 0) { + appendToReplayLog(replay_log_, parsed_key.type, value); + // Key matches. Process it switch (parsed_key.type) { case kTypeValue: @@ -116,4 +140,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, return false; } +void replayGetContextLog(const Slice& replay_log, const Slice& user_key, + GetContext* get_context) { +#ifndef ROCKSDB_LITE + Slice s = replay_log; + while (s.size()) { + auto type = static_cast(*s.data()); + s.remove_prefix(1); + Slice value; + bool ret = GetLengthPrefixedSlice(&s, &value); + assert(ret); + (void)ret; + // Sequence number is ignored in SaveValue, so we just pass 0. + get_context->SaveValue(ParsedInternalKey(user_key, 0, type), value); + } +#else // ROCKSDB_LITE + assert(false); +#endif // ROCKSDB_LITE +} + } // namespace rocksdb diff --git a/table/get_context.h b/table/get_context.h index 700f23aebe..2c2dd8e1d8 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -31,6 +31,11 @@ class GetContext { bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value); GetState State() const { return state_; } + // If a non-null string is passed, all the SaveValue calls will be + // logged into the string. The operations can then be replayed on + // another GetContext with replayGetContextLog. + void SetReplayLog(std::string* replay_log) { replay_log_ = replay_log; } + private: const Comparator* ucmp_; const MergeOperator* merge_operator_; @@ -44,6 +49,10 @@ class GetContext { bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; Env* env_; + std::string* replay_log_; }; +void replayGetContextLog(const Slice& replay_log, const Slice& user_key, + GetContext* get_context); + } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index 1244f498a2..57410b1612 100644 --- a/util/options.cc +++ b/util/options.cc @@ -71,8 +71,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) access_hint_on_compaction_start(options.access_hint_on_compaction_start), num_levels(options.num_levels), optimize_filters_for_hits(options.optimize_filters_for_hits), - listeners(options.listeners) { -} + listeners(options.listeners), + row_cache(options.row_cache) {} ColumnFamilyOptions::ColumnFamilyOptions() : comparator(BytewiseComparator()), @@ -290,7 +290,8 @@ DBOptions::DBOptions(const Options& options) listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), delayed_write_rate(options.delayed_write_rate), - wal_recovery_mode(options.wal_recovery_mode) {} + wal_recovery_mode(options.wal_recovery_mode), + row_cache(options.row_cache) {} static const char* const access_hints[] = { "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" @@ -360,6 +361,12 @@ void DBOptions::Dump(Logger* log) const { wal_bytes_per_sync); Warn(log, " Options.enable_thread_tracking: %d", enable_thread_tracking); + if (row_cache) { + Warn(log, " Options.row_cache: %" PRIu64, + row_cache->GetCapacity()); + } else { + Warn(log, " Options.row_cache: None"); + } } // DBOptions::Dump void ColumnFamilyOptions::Dump(Logger* log) const {