From fc0c399d2e50a0fdc75e94e9a00df5721ad84cb6 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Sat, 24 Aug 2013 22:48:51 -0700 Subject: [PATCH] Introduced a new flag non_blocking_io in ReadOptions. Summary: If ReadOptions.non_blocking_io is set to true, then KeyMayExists and Iterators will return data that is cached in RAM. If the Iterator needs to do IO from storage to serve the data, then the Iterator.status() will return Status::IsRetry(). Test Plan: Enhanced unit test DBTest.KeyMayExist to detect if there were are IOs issues from storage. Added DBTest.NonBlockingIteration to verify nonblocking Iterations. Reviewers: emayanke, haobo Reviewed By: haobo CC: leveldb Maniphest Tasks: T63 Differential Revision: https://reviews.facebook.net/D12531 --- .gitignore | 1 + db/db_impl.cc | 7 +-- db/db_impl.h | 1 - db/db_test.cc | 94 +++++++++++++++++++++++++++++++++++++- db/table_cache.cc | 15 +++--- db/table_cache.h | 3 +- db/version_set.cc | 6 +-- db/version_set.h | 2 +- include/rocksdb/iterator.h | 2 + include/rocksdb/options.h | 24 +++++++++- include/rocksdb/status.h | 7 +++ table/table.cc | 18 ++++---- table/table.h | 6 +-- 13 files changed, 152 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index d987e57679..55f9639d54 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ util/build_version.cc build_tools/VALGRIND_LOGS/ coverage/COVERAGE_REPORT util/build_version.cc.tmp +.gdbhistory diff --git a/db/db_impl.cc b/db/db_impl.cc index f19ecba6c0..7f1efdb889 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2203,7 +2203,6 @@ Status DBImpl::Get(const ReadOptions& options, Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_io, bool* value_found) { Status s; @@ -2242,7 +2241,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, // Done } else { current->Get(options, lkey, value, &s, &merge_operands, &stats, - options_, no_io, value_found); + options_, value_found); have_stat_update = true; } mutex_.Lock(); @@ -2348,7 +2347,9 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, if (value_found != nullptr) { *value_found = true; // falsify later if key-may-exist but can't fetch value } - return GetImpl(options, key, value, true, value_found).ok(); + ReadOptions roptions = options; + roptions.read_tier = kBlockCacheTier; // read from block cache only + return GetImpl(roptions, key, value, value_found).ok(); } Iterator* DBImpl::NewIterator(const ReadOptions& options) { diff --git a/db/db_impl.h b/db/db_impl.h index 4d9b09c49f..6f4b5db426 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -424,7 +424,6 @@ class DBImpl : public DB { Status GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_io = false, bool* value_found = nullptr); }; diff --git a/db/db_test.cc b/db/db_test.cc index faa24c6bac..c64b620c5f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,6 +11,7 @@ #include "db/filename.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "db/db_statistics.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" @@ -829,6 +830,7 @@ TEST(DBTest, KeyMayExist) { std::string value; Options options = CurrentOptions(); options.filter_policy = NewBloomFilterPolicy(20); + options.statistics = leveldb::CreateDBStatistics(); Reopen(&options); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); @@ -841,24 +843,114 @@ TEST(DBTest, KeyMayExist) { dbfull()->Flush(FlushOptions()); value.clear(); - value_found = false; + + long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + long cache_miss = + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); ASSERT_TRUE(!value_found); + // assert that no new files were opened and no new blocks were + // read into block cache. + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); ASSERT_OK(db_->Delete(WriteOptions(), "a")); + + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); dbfull()->Flush(FlushOptions()); dbfull()->CompactRange(nullptr, nullptr); + + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); ASSERT_OK(db_->Delete(WriteOptions(), "c")); + + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value)); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); delete options.filter_policy; } while (ChangeOptions()); } +TEST(DBTest, NonBlockingIteration) { + do { + ReadOptions non_blocking_opts, regular_opts; + Options options = CurrentOptions(); + options.statistics = leveldb::CreateDBStatistics(); + non_blocking_opts.read_tier = kBlockCacheTier; + Reopen(&options); + + // write one kv to the database. + ASSERT_OK(db_->Put(WriteOptions(), "a", "b")); + + // scan using non-blocking iterator. We should find it because + // it is in memtable. + Iterator* iter = db_->NewIterator(non_blocking_opts); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->status().ok()); + count++; + } + ASSERT_EQ(count, 1); + delete iter; + + // flush memtable to storage. Now, the key should not be in the + // memtable neither in the block cache. + dbfull()->Flush(FlushOptions()); + + // verify that a non-blocking iterator does not find any + // kvs. Neither does it do any IOs to storage. + long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + long cache_miss = + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + iter = db_->NewIterator(non_blocking_opts); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + count++; + } + ASSERT_EQ(count, 0); + ASSERT_TRUE(iter->status().IsIncomplete()); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + delete iter; + + // read in the specified block via a regular get + ASSERT_EQ(Get("a"), "b"); + + // verify that we can find it via a non-blocking scan + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + iter = db_->NewIterator(non_blocking_opts); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->status().ok()); + count++; + } + ASSERT_EQ(count, 1); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + delete iter; + + } while (ChangeOptions()); +} + // A delete is skipped for key if KeyMayExist(key) returns False // Tests Writebatch consistency and proper delete behaviour TEST(DBTest, FilterDeletes) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 48d1177553..dc8769f48b 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -48,7 +48,7 @@ Status TableCache::FindTable(const EnvOptions& toptions, *handle = cache_->Lookup(key); if (*handle == nullptr) { if (no_io) { // Dont do IO and return a not-found status - return Status::NotFound("Table not found in table_cache, no_io is set"); + return Status::Incomplete("Table not found in table_cache, no_io is set"); } if (table_io != nullptr) { *table_io = true; // we had to do IO from storage @@ -90,7 +90,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, } Cache::Handle* handle = nullptr; - Status s = FindTable(toptions, file_number, file_size, &handle); + Status s = FindTable(toptions, file_number, file_size, &handle, + nullptr, options.read_tier == kBlockCacheTier); if (!s.ok()) { return NewErrorIterator(s); } @@ -117,17 +118,17 @@ Status TableCache::Get(const ReadOptions& options, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), bool* table_io, - void (*mark_key_may_exist)(void*), - const bool no_io) { + void (*mark_key_may_exist)(void*)) { Cache::Handle* handle = nullptr; Status s = FindTable(storage_options_, file_number, file_size, - &handle, table_io, no_io); + &handle, table_io, + options.read_tier == kBlockCacheTier); if (s.ok()) { Table* t = reinterpret_cast(cache_->Value(handle)); - s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_io); + s = t->InternalGet(options, k, arg, saver, mark_key_may_exist); cache_->Release(handle); - } else if (no_io && s.IsNotFound()) { + } else if (options.read_tier && s.IsIncomplete()) { // Couldnt find Table in cache but treat as kFound if no_io set (*mark_key_may_exist)(arg); return Status::OK(); diff --git a/db/table_cache.h b/db/table_cache.h index d7308020c0..c9e68738bc 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -49,8 +49,7 @@ class TableCache { void* arg, bool (*handle_result)(void*, const Slice&, const Slice&, bool), bool* table_io, - void (*mark_key_may_exist)(void*) = nullptr, - const bool no_io = false); + void (*mark_key_may_exist)(void*) = nullptr); // Determine whether the table may contain the specified prefix. If // the table index of blooms are not in memory, this may cause an I/O diff --git a/db/version_set.cc b/db/version_set.cc index dca8c72287..54be370bd9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -415,7 +415,6 @@ void Version::Get(const ReadOptions& options, std::deque* operands, GetStats* stats, const Options& db_options, - const bool no_io, bool* value_found) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -425,9 +424,6 @@ void Version::Get(const ReadOptions& options, auto logger = db_options.info_log; assert(status->ok() || status->IsMergeInProgress()); - if (no_io) { - assert(status->ok()); - } Saver saver; saver.state = status->ok()? kNotFound : kMerge; saver.ucmp = ucmp; @@ -516,7 +512,7 @@ void Version::Get(const ReadOptions& options, bool tableIO = false; *status = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue, &tableIO, - MarkKeyMayExist, no_io); + MarkKeyMayExist); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; diff --git a/db/version_set.h b/db/version_set.h index 9a7aeb20b9..9a10682978 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -76,7 +76,7 @@ class Version { }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, std::deque* operands, GetStats* stats, - const Options& db_option, const bool no_io = false, + const Options& db_option, bool* value_found = nullptr); // Adds "stats" into the current state. Returns true if a new diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 9dde6d70f4..4270e95f7d 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -65,6 +65,8 @@ class Iterator { virtual Slice value() const = 0; // If an error has occurred, return it. Else return an ok status. + // If non-blocking IO is requested and this operation cannot be + // satisfied without doing some IO, then this returns Status::Incomplete(). virtual Status status() const = 0; // Clients are allowed to register function/arg1/arg2 triples that diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e3701af097..8e66811ac0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -543,6 +543,18 @@ struct Options { std::shared_ptr compaction_filter_factory; }; +// +// An application can issue a read request (via Get/Iterators) and specify +// if that read should process data that ALREADY resides on a specified cache +// level. For example, if an application specifies kBlockCacheTier then the +// Get call will process data that is already processed in the memtable or +// the block cache. It will not page in data from the OS cache or data that +// resides in storage. +enum ReadTier { + kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage + kBlockCacheTier = 0x1 // data in memtable or block cache +}; + // Options that control read operations struct ReadOptions { // If true, all data read from underlying storage will be @@ -575,15 +587,23 @@ struct ReadOptions { // Default: nullptr const Slice* prefix; + // Specify if this read request should process data that ALREADY + // resides on a particular cache. If the required data is not + // found at the specified cache, then Status::WouldBlock is returned. + // Default: kReadAllTier + ReadTier read_tier; + ReadOptions() : verify_checksums(false), fill_cache(true), snapshot(nullptr), - prefix(nullptr) { + prefix(nullptr), + read_tier(kReadAllTier) { } ReadOptions(bool cksum, bool cache) : verify_checksums(cksum), fill_cache(cache), - snapshot(nullptr), prefix(nullptr) { + snapshot(nullptr), prefix(nullptr), + read_tier(kReadAllTier) { } }; diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index f8cdbc7a1d..f3af5bfabf 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -50,6 +50,9 @@ class Status { static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kMergeInProgress, msg, msg2); } + static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kIncomplete, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return (state_ == nullptr); } @@ -72,6 +75,9 @@ class Status { // Returns true iff the status indicates an MergeInProgress. bool IsMergeInProgress() const { return code() == kMergeInProgress; } + // Returns true iff the status indicates Incomplete + bool IsIncomplete() const { return code() == kIncomplete; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -92,6 +98,7 @@ class Status { kInvalidArgument = 4, kIOError = 5, kMergeInProgress = 6, + kIncomplete = 7 }; Code code() const { diff --git a/table/table.cc b/table/table.cc index 6d7ddb6ac2..f2b80cbbc3 100644 --- a/table/table.cc +++ b/table/table.cc @@ -237,8 +237,8 @@ Iterator* Table::BlockReader(void* arg, const ReadOptions& options, const Slice& index_value, bool* didIO, - bool for_compaction, - const bool no_io) { + bool for_compaction) { + const bool no_io = (options.read_tier == kBlockCacheTier); Table* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); std::shared_ptr statistics = table->rep_->options.statistics; @@ -268,7 +268,8 @@ Iterator* Table::BlockReader(void* arg, RecordTick(statistics, BLOCK_CACHE_HIT); } else if (no_io) { - return nullptr; // Did not find in block_cache and can't do IO + // Did not find in block_cache and can't do IO + return NewErrorIterator(Status::Incomplete("no blocking io")); } else { Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; @@ -292,7 +293,8 @@ Iterator* Table::BlockReader(void* arg, RecordTick(statistics, BLOCK_CACHE_MISS); } } else if (no_io) { - return nullptr; // Could not read from block_cache and can't do IO + // Could not read from block_cache and can't do IO + return NewErrorIterator(Status::Incomplete("no blocking io")); }else { s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO); } @@ -401,8 +403,7 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), - void (*mark_key_may_exist)(void*), - const bool no_io) { + void (*mark_key_may_exist)(void*)) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); bool done = false; @@ -421,9 +422,10 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, } else { bool didIO = false; Iterator* block_iter = BlockReader(this, options, iiter->value(), - &didIO, false, no_io); + &didIO); - if (no_io && !block_iter) { // couldn't get block from block_cache + if (options.read_tier && block_iter->status().IsIncomplete()) { + // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set (*mark_key_may_exist)(arg); diff --git a/table/table.h b/table/table.h index a7014f911f..52d618f38b 100644 --- a/table/table.h +++ b/table/table.h @@ -79,8 +79,7 @@ class Table { const EnvOptions& soptions, const Slice&, bool for_compaction); static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, - bool* didIO, bool for_compaction = false, - const bool no_io = false); + bool* didIO, bool for_compaction = false); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. @@ -90,8 +89,7 @@ class Table { const ReadOptions&, const Slice& key, void* arg, bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), - void (*mark_key_may_exist)(void*) = nullptr, - const bool no_io = false); + void (*mark_key_may_exist)(void*) = nullptr); void ReadMeta(const Footer& footer);