diff --git a/CMakeLists.txt b/CMakeLists.txt index 676192913c..e2685f7064 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -571,6 +571,7 @@ set(SOURCES db/arena_wrapped_db_iter.cc db/blob/blob_file_addition.cc db/blob/blob_file_builder.cc + db/blob/blob_file_cache.cc db/blob/blob_file_garbage.cc db/blob/blob_file_meta.cc db/blob/blob_file_reader.cc @@ -1044,8 +1045,10 @@ if(WITH_TESTS) cache/lru_cache_test.cc db/blob/blob_file_addition_test.cc db/blob/blob_file_builder_test.cc + db/blob/blob_file_cache_test.cc db/blob/blob_file_garbage_test.cc db/blob/blob_file_reader_test.cc + db/blob/db_blob_basic_test.cc db/blob/db_blob_index_test.cc db/column_family_test.cc db/compact_files_test.cc diff --git a/Makefile b/Makefile index ef0d7a09de..68c3dc79c2 100644 --- a/Makefile +++ b/Makefile @@ -577,6 +577,7 @@ ifdef ASSERT_STATUS_CHECKED lru_cache_test \ blob_file_addition_test \ blob_file_builder_test \ + blob_file_cache_test \ blob_file_garbage_test \ blob_file_reader_test \ bloom_test \ @@ -588,6 +589,7 @@ ifdef ASSERT_STATUS_CHECKED crc32c_test \ dbformat_test \ db_basic_test \ + db_blob_basic_test \ db_flush_test \ db_with_timestamp_basic_test \ db_with_timestamp_compaction_test \ @@ -687,6 +689,7 @@ endif # Not necessarily well thought out or up-to-date, but matches old list TESTS_PLATFORM_DEPENDENT := \ db_basic_test \ + db_blob_basic_test \ db_with_timestamp_basic_test \ db_encryption_test \ db_test2 \ @@ -1462,6 +1465,9 @@ slice_transform_test: $(OBJ_DIR)/util/slice_transform_test.o $(TEST_LIBRARY) $(L db_basic_test: $(OBJ_DIR)/db/db_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_blob_basic_test: $(OBJ_DIR)/db/blob/db_blob_basic_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) @@ -1914,6 +1920,9 @@ blob_file_addition_test: $(OBJ_DIR)/db/blob/blob_file_addition_test.o $(TEST_LIB blob_file_builder_test: $(OBJ_DIR)/db/blob/blob_file_builder_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +blob_file_cache_test: $(OBJ_DIR)/db/blob/blob_file_cache_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index ee6d99032a..d53d78f201 100644 --- a/TARGETS +++ b/TARGETS @@ -135,6 +135,7 @@ cpp_library( "db/arena_wrapped_db_iter.cc", "db/blob/blob_file_addition.cc", "db/blob/blob_file_builder.cc", + "db/blob/blob_file_cache.cc", "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", "db/blob/blob_file_reader.cc", @@ -423,6 +424,7 @@ cpp_library( "db/arena_wrapped_db_iter.cc", "db/blob/blob_file_addition.cc", "db/blob/blob_file_builder.cc", + "db/blob/blob_file_cache.cc", "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", "db/blob/blob_file_reader.cc", @@ -857,6 +859,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "blob_file_cache_test", + "db/blob/blob_file_cache_test.cc", + "serial", + [], + [], + ], [ "blob_file_garbage_test", "db/blob/blob_file_garbage_test.cc", @@ -1095,6 +1104,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_blob_basic_test", + "db/blob/db_blob_basic_test.cc", + "serial", + [], + [], + ], [ "db_blob_index_test", "db/blob/db_blob_index_test.cc", diff --git a/cache/cache_helpers.h b/cache/cache_helpers.h new file mode 100644 index 0000000000..a7a5864a03 --- /dev/null +++ b/cache/cache_helpers.h @@ -0,0 +1,114 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "rocksdb/cache.h" +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +// Returns the cached value given a cache handle. +template +T* GetFromCacheHandle(Cache* cache, Cache::Handle* handle) { + assert(cache); + assert(handle); + + return static_cast(cache->Value(handle)); +} + +// Simple generic deleter for Cache (to be used with Cache::Insert). +template +void DeleteCacheEntry(const Slice& /* key */, void* value) { + delete static_cast(value); +} + +// Turns a T* into a Slice so it can be used as a key with Cache. +template +Slice GetSlice(const T* t) { + return Slice(reinterpret_cast(t), sizeof(T)); +} + +// Generic resource management object for cache handles that releases the handle +// when destroyed. Has unique ownership of the handle, so copying it is not +// allowed, while moving it transfers ownership. +template +class CacheHandleGuard { + public: + CacheHandleGuard() = default; + + CacheHandleGuard(Cache* cache, Cache::Handle* handle) + : cache_(cache), + handle_(handle), + value_(GetFromCacheHandle(cache, handle)) { + assert(cache_ && handle_ && value_); + } + + CacheHandleGuard(const CacheHandleGuard&) = delete; + CacheHandleGuard& operator=(const CacheHandleGuard&) = delete; + + CacheHandleGuard(CacheHandleGuard&& rhs) noexcept + : cache_(rhs.cache_), handle_(rhs.handle_), value_(rhs.value_) { + assert((!cache_ && !handle_ && !value_) || (cache_ && handle_ && value_)); + + rhs.ResetFields(); + } + + CacheHandleGuard& operator=(CacheHandleGuard&& rhs) noexcept { + if (this == &rhs) { + return *this; + } + + ReleaseHandle(); + + cache_ = rhs.cache_; + handle_ = rhs.handle_; + value_ = rhs.value_; + + assert((!cache_ && !handle_ && !value_) || (cache_ && handle_ && value_)); + + rhs.ResetFields(); + + return *this; + } + + ~CacheHandleGuard() { ReleaseHandle(); } + + bool IsEmpty() const { return !handle_; } + + Cache* GetCache() const { return cache_; } + Cache::Handle* GetCacheHandle() const { return handle_; } + T* GetValue() const { return value_; } + + void Reset() { + ReleaseHandle(); + ResetFields(); + } + + private: + void ReleaseHandle() { + if (IsEmpty()) { + return; + } + + assert(cache_); + cache_->Release(handle_); + } + + void ResetFields() { + cache_ = nullptr; + handle_ = nullptr; + value_ = nullptr; + } + + private: + Cache* cache_ = nullptr; + Cache::Handle* handle_ = nullptr; + T* value_ = nullptr; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_cache.cc b/db/blob/blob_file_cache.cc new file mode 100644 index 0000000000..0d0c292404 --- /dev/null +++ b/db/blob/blob_file_cache.cc @@ -0,0 +1,99 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_file_cache.h" + +#include +#include + +#include "db/blob/blob_file_reader.h" +#include "options/cf_options.h" +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "test_util/sync_point.h" +#include "util/hash.h" + +namespace ROCKSDB_NAMESPACE { + +BlobFileCache::BlobFileCache(Cache* cache, + const ImmutableCFOptions* immutable_cf_options, + const FileOptions* file_options, + uint32_t column_family_id, + HistogramImpl* blob_file_read_hist) + : cache_(cache), + mutex_(kNumberOfMutexStripes, GetSliceNPHash64), + immutable_cf_options_(immutable_cf_options), + file_options_(file_options), + column_family_id_(column_family_id), + blob_file_read_hist_(blob_file_read_hist) { + assert(cache_); + assert(immutable_cf_options_); + assert(file_options_); +} + +Status BlobFileCache::GetBlobFileReader( + uint64_t blob_file_number, + CacheHandleGuard* blob_file_reader) { + assert(blob_file_reader); + assert(blob_file_reader->IsEmpty()); + + const Slice key = GetSlice(&blob_file_number); + + assert(cache_); + + Cache::Handle* handle = cache_->Lookup(key); + if (handle) { + *blob_file_reader = CacheHandleGuard(cache_, handle); + return Status::OK(); + } + + TEST_SYNC_POINT("BlobFileCache::GetBlobFileReader:DoubleCheck"); + + // Check again while holding mutex + MutexLock lock(mutex_.get(key)); + + handle = cache_->Lookup(key); + if (handle) { + *blob_file_reader = CacheHandleGuard(cache_, handle); + return Status::OK(); + } + + assert(immutable_cf_options_); + Statistics* const statistics = immutable_cf_options_->statistics; + + RecordTick(statistics, NO_FILE_OPENS); + + std::unique_ptr reader; + + { + assert(file_options_); + const Status s = BlobFileReader::Create( + *immutable_cf_options_, *file_options_, column_family_id_, + blob_file_read_hist_, blob_file_number, &reader); + if (!s.ok()) { + RecordTick(statistics, NO_FILE_ERRORS); + return s; + } + } + + { + constexpr size_t charge = 1; + + const Status s = cache_->Insert(key, reader.get(), charge, + &DeleteCacheEntry, &handle); + if (!s.ok()) { + RecordTick(statistics, NO_FILE_ERRORS); + return s; + } + } + + reader.release(); + + *blob_file_reader = CacheHandleGuard(cache_, handle); + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_cache.h b/db/blob/blob_file_cache.h new file mode 100644 index 0000000000..09f09e87e2 --- /dev/null +++ b/db/blob/blob_file_cache.h @@ -0,0 +1,49 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "cache/cache_helpers.h" +#include "rocksdb/rocksdb_namespace.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +class Cache; +struct ImmutableCFOptions; +struct FileOptions; +class HistogramImpl; +class Status; +class BlobFileReader; +class Slice; + +class BlobFileCache { + public: + BlobFileCache(Cache* cache, const ImmutableCFOptions* immutable_cf_options, + const FileOptions* file_options, uint32_t column_family_id, + HistogramImpl* blob_file_read_hist); + + BlobFileCache(const BlobFileCache&) = delete; + BlobFileCache& operator=(const BlobFileCache&) = delete; + + Status GetBlobFileReader(uint64_t blob_file_number, + CacheHandleGuard* blob_file_reader); + + private: + Cache* cache_; + // Note: mutex_ below is used to guard against multiple threads racing to open + // the same file. + Striped mutex_; + const ImmutableCFOptions* immutable_cf_options_; + const FileOptions* file_options_; + uint32_t column_family_id_; + HistogramImpl* blob_file_read_hist_; + + static constexpr size_t kNumberOfMutexStripes = 1 << 7; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_cache_test.cc b/db/blob/blob_file_cache_test.cc new file mode 100644 index 0000000000..214fe41c5e --- /dev/null +++ b/db/blob/blob_file_cache_test.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_file_cache.h" + +#include +#include + +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_writer.h" +#include "env/mock_env.h" +#include "file/filename.h" +#include "file/read_write_util.h" +#include "file/writable_file_writer.h" +#include "options/cf_options.h" +#include "rocksdb/cache.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/options.h" +#include "rocksdb/statistics.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +// Creates a test blob file with a single blob in it. +void WriteBlobFile(uint32_t column_family_id, + const ImmutableCFOptions& immutable_cf_options, + uint64_t blob_file_number) { + assert(!immutable_cf_options.cf_paths.empty()); + + const std::string blob_file_path = BlobFileName( + immutable_cf_options.cf_paths.front().path, blob_file_number); + + std::unique_ptr file; + ASSERT_OK(NewWritableFile(immutable_cf_options.fs, blob_file_path, &file, + FileOptions())); + + std::unique_ptr file_writer( + new WritableFileWriter(std::move(file), blob_file_path, FileOptions(), + immutable_cf_options.env)); + + constexpr Statistics* statistics = nullptr; + constexpr bool use_fsync = false; + + BlobLogWriter blob_log_writer(std::move(file_writer), + immutable_cf_options.env, statistics, + blob_file_number, use_fsync); + + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + + BlobLogHeader header(column_family_id, kNoCompression, has_ttl, + expiration_range); + + ASSERT_OK(blob_log_writer.WriteHeader(header)); + + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + + std::string compressed_blob; + Slice blob_to_write; + + uint64_t key_offset = 0; + uint64_t blob_offset = 0; + + ASSERT_OK(blob_log_writer.AddRecord(key, blob, &key_offset, &blob_offset)); + + BlobLogFooter footer; + footer.blob_count = 1; + footer.expiration_range = expiration_range; + + std::string checksum_method; + std::string checksum_value; + + ASSERT_OK( + blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value)); +} + +} // anonymous namespace + +class BlobFileCacheTest : public testing::Test { + protected: + BlobFileCacheTest() : mock_env_(Env::Default()) {} + + MockEnv mock_env_; +}; + +TEST_F(BlobFileCacheTest, GetBlobFileReader) { + Options options; + options.env = &mock_env_; + options.statistics = CreateDBStatistics(); + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileCacheTest_GetBlobFileReader"), + 0); + options.enable_blob_files = true; + + constexpr uint32_t column_family_id = 1; + ImmutableCFOptions immutable_cf_options(options); + constexpr uint64_t blob_file_number = 123; + + WriteBlobFile(column_family_id, immutable_cf_options, blob_file_number); + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = NewLRUCache(capacity); + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + BlobFileCache blob_file_cache(backing_cache.get(), &immutable_cf_options, + &file_options, column_family_id, + blob_file_read_hist); + + // First try: reader should be opened and put in cache + CacheHandleGuard first; + + ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &first)); + ASSERT_NE(first.GetValue(), nullptr); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0); + + // Second try: reader should be served from cache + CacheHandleGuard second; + + ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &second)); + ASSERT_NE(second.GetValue(), nullptr); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0); + + ASSERT_EQ(first.GetValue(), second.GetValue()); +} + +TEST_F(BlobFileCacheTest, GetBlobFileReader_Race) { + Options options; + options.env = &mock_env_; + options.statistics = CreateDBStatistics(); + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileCacheTest_GetBlobFileReader_Race"), + 0); + options.enable_blob_files = true; + + constexpr uint32_t column_family_id = 1; + ImmutableCFOptions immutable_cf_options(options); + constexpr uint64_t blob_file_number = 123; + + WriteBlobFile(column_family_id, immutable_cf_options, blob_file_number); + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = NewLRUCache(capacity); + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + BlobFileCache blob_file_cache(backing_cache.get(), &immutable_cf_options, + &file_options, column_family_id, + blob_file_read_hist); + + CacheHandleGuard first; + CacheHandleGuard second; + + SyncPoint::GetInstance()->SetCallBack( + "BlobFileCache::GetBlobFileReader:DoubleCheck", [&](void* /* arg */) { + // Disabling sync points to prevent infinite recursion + SyncPoint::GetInstance()->DisableProcessing(); + + ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &second)); + ASSERT_NE(second.GetValue(), nullptr); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &first)); + ASSERT_NE(first.GetValue(), nullptr); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0); + + ASSERT_EQ(first.GetValue(), second.GetValue()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(BlobFileCacheTest, GetBlobFileReader_IOError) { + Options options; + options.env = &mock_env_; + options.statistics = CreateDBStatistics(); + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileCacheTest_GetBlobFileReader_IOError"), + 0); + options.enable_blob_files = true; + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = NewLRUCache(capacity); + + ImmutableCFOptions immutable_cf_options(options); + FileOptions file_options; + constexpr uint32_t column_family_id = 1; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + BlobFileCache blob_file_cache(backing_cache.get(), &immutable_cf_options, + &file_options, column_family_id, + blob_file_read_hist); + + // Note: there is no blob file with the below number + constexpr uint64_t blob_file_number = 123; + + CacheHandleGuard reader; + + ASSERT_TRUE( + blob_file_cache.GetBlobFileReader(blob_file_number, &reader).IsIOError()); + ASSERT_EQ(reader.GetValue(), nullptr); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 1); +} + +TEST_F(BlobFileCacheTest, GetBlobFileReader_CacheFull) { + Options options; + options.env = &mock_env_; + options.statistics = CreateDBStatistics(); + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileCacheTest_GetBlobFileReader_CacheFull"), + 0); + options.enable_blob_files = true; + + constexpr uint32_t column_family_id = 1; + ImmutableCFOptions immutable_cf_options(options); + constexpr uint64_t blob_file_number = 123; + + WriteBlobFile(column_family_id, immutable_cf_options, blob_file_number); + + constexpr size_t capacity = 0; + constexpr int num_shard_bits = -1; // determined automatically + constexpr bool strict_capacity_limit = true; + std::shared_ptr backing_cache = + NewLRUCache(capacity, num_shard_bits, strict_capacity_limit); + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + BlobFileCache blob_file_cache(backing_cache.get(), &immutable_cf_options, + &file_options, column_family_id, + blob_file_read_hist); + + // Insert into cache should fail since it has zero capacity and + // strict_capacity_limit is set + CacheHandleGuard reader; + + ASSERT_TRUE(blob_file_cache.GetBlobFileReader(blob_file_number, &reader) + .IsIncomplete()); + ASSERT_EQ(reader.GetValue(), nullptr); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1); + ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 1); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index e8af662f1e..71d5eadccd 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -18,6 +18,7 @@ #include "rocksdb/env.h" #include "rocksdb/file_system.h" #include "rocksdb/options.h" +#include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/compression.h" #include "utilities/fault_injection_env.h" diff --git a/db/blob/blob_log_format.cc b/db/blob/blob_log_format.cc index b5cd0bdcc7..482bd078e6 100644 --- a/db/blob/blob_log_format.cc +++ b/db/blob/blob_log_format.cc @@ -95,10 +95,6 @@ Status BlobLogFooter::DecodeFrom(Slice src) { return Status::OK(); } -uint64_t BlobLogRecord::CalculateAdjustmentForRecordHeader(uint64_t key_size) { - return key_size + kHeaderSize; -} - void BlobLogRecord::EncodeHeaderTo(std::string* dst) { assert(dst != nullptr); dst->clear(); diff --git a/db/blob/blob_log_format.h b/db/blob/blob_log_format.h index afeb8d3709..539bbb5261 100644 --- a/db/blob/blob_log_format.h +++ b/db/blob/blob_log_format.h @@ -107,7 +107,9 @@ struct BlobLogRecord { // Note that the offset field of BlobIndex actually points to the blob value // as opposed to the start of the blob record. The following method can // be used to calculate the adjustment needed to read the blob record header. - static uint64_t CalculateAdjustmentForRecordHeader(uint64_t key_size); + static uint64_t CalculateAdjustmentForRecordHeader(uint64_t key_size) { + return key_size + kHeaderSize; + } uint64_t key_size = 0; uint64_t value_size = 0; diff --git a/db/blob/blob_log_sequential_reader.cc b/db/blob/blob_log_sequential_reader.cc index 58afd27a9c..0a0262ee78 100644 --- a/db/blob/blob_log_sequential_reader.cc +++ b/db/blob/blob_log_sequential_reader.cc @@ -6,8 +6,6 @@ #include "db/blob/blob_log_sequential_reader.h" -#include - #include "file/random_access_file_reader.h" #include "monitoring/statistics.h" #include "util/stop_watch.h" diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc new file mode 100644 index 0000000000..d5838af09d --- /dev/null +++ b/db/blob/db_blob_basic_test.cc @@ -0,0 +1,183 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_index.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { + +class DBBlobBasicTest : public DBTestBase { + protected: + DBBlobBasicTest() + : DBTestBase("/db_blob_basic_test", /* env_do_fsync */ false) {} +}; + +TEST_F(DBBlobBasicTest, GetBlob) { + Options options; + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(Put(key, blob_value)); + + ASSERT_OK(Flush()); + + ASSERT_EQ(Get(key), blob_value); + + // Try again with no I/O allowed. The table and the necessary blocks should + // already be in their respective caches; however, the blob itself can only be + // read from the blob file, so the read should return Incomplete. + ReadOptions read_options; + read_options.read_tier = kBlockCacheTier; + + PinnableSlice result; + ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result) + .IsIncomplete()); +} + +TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { + Options options; + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + + // Fake a corrupt blob index. + const std::string blob_index("foobar"); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsCorruption()); +} + +TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) { + constexpr uint64_t min_blob_size = 10; + + Options options; + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob[] = "short"; + static_assert(sizeof(short) - 1 < min_blob_size, + "Blob too long to be inlined"); + + // Fake an inlined TTL blob index. + std::string blob_index; + + constexpr uint64_t expiration = 1234567890; + + BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsCorruption()); +} + +TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) { + Options options; + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + + // Fake a blob index referencing a non-existent blob file. + std::string blob_index; + + constexpr uint64_t blob_file_number = 1000; + constexpr uint64_t offset = 1234; + constexpr uint64_t size = 5678; + + BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, + kNoCompression); + + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + ASSERT_OK(Flush()); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsCorruption()); +} + +class DBBlobBasicIOErrorTest : public DBBlobBasicTest, + public testing::WithParamInterface { + protected: + DBBlobBasicIOErrorTest() + : fault_injection_env_(Env::Default()), sync_point_(GetParam()) {} + ~DBBlobBasicIOErrorTest() { Close(); } + + FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest, + ::testing::ValuesIn(std::vector{ + "BlobFileReader::OpenFile:NewRandomAccessFile", + "BlobFileReader::GetBlob:ReadFromFile"})); + +TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) { + Options options; + options.env = &fault_injection_env_; + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(Put(key, blob_value)); + + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) + .IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/blob/db_blob_index_test.cc b/db/blob/db_blob_index_test.cc index e7ceabd3e5..f7525c1d8e 100644 --- a/db/blob/db_blob_index_test.cc +++ b/db/blob/db_blob_index_test.cc @@ -73,6 +73,9 @@ class DBBlobIndexTest : public DBTestBase { if (s.IsNotFound()) { return "NOT_FOUND"; } + if (s.IsCorruption()) { + return "CORRUPTION"; + } if (s.IsNotSupported()) { return "NOT_SUPPORTED"; } @@ -153,8 +156,13 @@ TEST_F(DBBlobIndexTest, Write) { } } -// Get should be able to return blob index if is_blob_index is provided, -// otherwise return Status::NotSupported status. +// Note: the following test case pertains to the StackableDB-based BlobDB +// implementation. Get should be able to return blob index if is_blob_index is +// provided, otherwise it should return Status::NotSupported (when reading from +// memtable) or Status::Corruption (when reading from SST). Reading from SST +// returns Corruption because we can't differentiate between the application +// accidentally opening the base DB of a stacked BlobDB and actual corruption +// when using the integrated BlobDB. TEST_F(DBBlobIndexTest, Get) { for (auto tier : kAllTiers) { DestroyAndReopen(GetTestOptions()); @@ -171,15 +179,22 @@ TEST_F(DBBlobIndexTest, Get) { ASSERT_EQ("value", GetImpl("key", &is_blob_index)); ASSERT_FALSE(is_blob_index); // Verify blob index - ASSERT_TRUE(Get("blob_key", &value).IsNotSupported()); - ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key")); + if (tier <= kImmutableMemtables) { + ASSERT_TRUE(Get("blob_key", &value).IsNotSupported()); + ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key")); + } else { + ASSERT_TRUE(Get("blob_key", &value).IsCorruption()); + ASSERT_EQ("CORRUPTION", GetImpl("blob_key")); + } ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index)); ASSERT_TRUE(is_blob_index); } } -// Get should NOT return Status::NotSupported if blob index is updated with -// a normal value. +// Note: the following test case pertains to the StackableDB-based BlobDB +// implementation. Get should NOT return Status::NotSupported/Status::Corruption +// if blob index is updated with a normal value. See the test case above for +// more details. TEST_F(DBBlobIndexTest, Updated) { for (auto tier : kAllTiers) { DestroyAndReopen(GetTestOptions()); @@ -206,7 +221,11 @@ TEST_F(DBBlobIndexTest, Updated) { ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot)); } ASSERT_EQ("new_value", Get("key1")); - ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2")); + if (tier <= kImmutableMemtables) { + ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2")); + } else { + ASSERT_EQ("CORRUPTION", GetImpl("key2")); + } ASSERT_EQ("NOT_FOUND", Get("key3")); ASSERT_EQ("NOT_FOUND", Get("key4")); ASSERT_EQ("a,b,c", GetImpl("key5")); diff --git a/db/column_family.cc b/db/column_family.cc index d0a16dd48c..db8c6f7787 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -16,6 +16,7 @@ #include #include +#include "db/blob/blob_file_cache.h" #include "db/compaction/compaction_picker.h" #include "db/compaction/compaction_picker_fifo.h" #include "db/compaction/compaction_picker_level.h" @@ -559,6 +560,10 @@ ColumnFamilyData::ColumnFamilyData( new InternalStats(ioptions_.num_levels, db_options.env, this)); table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache, block_cache_tracer, io_tracer)); + blob_file_cache_.reset( + new BlobFileCache(_table_cache, ioptions(), soptions(), id_, + internal_stats_->GetBlobFileReadHist())); + if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( new LevelCompactionPicker(ioptions_, &internal_comparator_)); diff --git a/db/column_family.h b/db/column_family.h index 0a251e5453..2102e0c619 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -44,6 +44,7 @@ class LogBuffer; class InstrumentedMutex; class InstrumentedMutexLock; struct SuperVersionContext; +class BlobFileCache; extern const double kIncSlowdownRatio; // This file contains a list of data structures for managing column family @@ -381,6 +382,7 @@ class ColumnFamilyData { SequenceNumber earliest_seq); TableCache* table_cache() const { return table_cache_.get(); } + BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); } // See documentation in compaction_picker.h // REQUIRES: DB mutex held @@ -543,6 +545,7 @@ class ColumnFamilyData { const bool is_delete_range_supported_; std::unique_ptr table_cache_; + std::unique_ptr blob_file_cache_; std::unique_ptr internal_stats_; diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 6edbc0a000..7edc6738cf 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -470,9 +470,7 @@ TEST_F(DBFlushTest, FlushWithBlob) { ASSERT_OK(Flush()); ASSERT_EQ(Get("key1"), short_value); - - // TODO: enable once Get support is implemented for blobs - // ASSERT_EQ(Get("key2"), long_value); + ASSERT_EQ(Get("key2"), long_value); VersionSet* const versions = dbfull()->TEST_GetVersionSet(); assert(versions); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 9d16132223..e4a57aff92 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -384,9 +384,7 @@ TEST_F(DBWALTest, RecoverWithBlob) { Reopen(options); ASSERT_EQ(Get("key1"), short_value); - - // TODO: enable once Get support is implemented for blobs - // ASSERT_EQ(Get("key2"), long_value); + ASSERT_EQ(Get("key2"), long_value); VersionSet* const versions = dbfull()->TEST_GetVersionSet(); assert(versions); diff --git a/db/internal_stats.h b/db/internal_stats.h index edb2c0582c..61d9b76575 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -673,6 +673,8 @@ class InternalStats { HistogramImpl* GetFileReadHist(int /*level*/) { return nullptr; } + HistogramImpl* GetBlobFileReadHist() { return nullptr; } + uint64_t GetBackgroundErrorCount() const { return 0; } uint64_t BumpAndGetBackgroundErrorCount() { return 0; } diff --git a/db/version_set.cc b/db/version_set.cc index 55562c84ed..6589b72e29 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -22,6 +22,9 @@ #include #include "compaction/compaction.h" +#include "db/blob/blob_file_cache.h" +#include "db/blob/blob_file_reader.h" +#include "db/blob/blob_index.h" #include "db/internal_stats.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -1757,6 +1760,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->statistics), table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()), + blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr), merge_operator_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator), storage_info_( @@ -1780,6 +1784,55 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, version_number_(version_number), io_tracer_(io_tracer) {} +Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, + PinnableSlice* value) const { + assert(value); + + if (read_options.read_tier == kBlockCacheTier) { + return Status::Incomplete("Cannot read blob: no disk I/O allowed"); + } + + BlobIndex blob_index; + + { + Status s = blob_index.DecodeFrom(*value); + if (!s.ok()) { + return s; + } + } + + if (blob_index.HasTTL() || blob_index.IsInlined()) { + return Status::Corruption("Unexpected TTL/inlined blob index"); + } + + const auto& blob_files = storage_info_.GetBlobFiles(); + + const uint64_t blob_file_number = blob_index.file_number(); + + const auto it = blob_files.find(blob_file_number); + if (it == blob_files.end()) { + return Status::Corruption("Invalid blob file number"); + } + + CacheHandleGuard blob_file_reader; + + { + assert(blob_file_cache_); + const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number, + &blob_file_reader); + if (!s.ok()) { + return s; + } + } + + assert(blob_file_reader.GetValue()); + const Status s = blob_file_reader.GetValue()->GetBlob( + read_options, user_key, blob_index.offset(), blob_index.size(), + blob_index.compression(), value); + + return s; +} + void Version::Get(const ReadOptions& read_options, const LookupKey& k, PinnableSlice* value, std::string* timestamp, Status* status, MergeContext* merge_context, @@ -1802,12 +1855,19 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, vset_->block_cache_tracer_->is_tracing_enabled()) { tracing_get_id = vset_->block_cache_tracer_->NextGetId(); } + + // Note: the old StackableDB-based BlobDB passes in + // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we + // need to provide it here. + bool is_blob_index = false; + bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index; + GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found, merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq, - merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use, tracing_get_id); // Pin blocks that we read to hold merge operands @@ -1865,6 +1925,18 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, // TODO: update per-level perfcontext user_key_return_count for kMerge break; case GetContext::kFound: + if (is_blob_index) { + if (do_merge && value) { + *status = GetBlob(read_options, user_key, value); + if (!status->ok()) { + if (status->IsIncomplete()) { + get_context.MarkKeyMayExist(); + } + return; + } + } + } + if (fp.GetHitFileLevel() == 0) { RecordTick(db_statistics_, GET_HIT_L0); } else if (fp.GetHitFileLevel() == 1) { @@ -1882,7 +1954,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, case GetContext::kCorrupt: *status = Status::Corruption("corrupted key for ", user_key); return; - case GetContext::kBlobIndex: + case GetContext::kUnexpectedBlobIndex: ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); *status = Status::NotSupported( "Encounter unexpected blob index. Please open DB with " @@ -2069,7 +2141,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, Status::Corruption("corrupted key for ", iter->lkey->user_key()); file_range.MarkKeyDone(iter); continue; - case GetContext::kBlobIndex: + case GetContext::kUnexpectedBlobIndex: ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); *status = Status::NotSupported( "Encounter unexpected blob index. Please open DB with " @@ -3661,6 +3733,7 @@ VersionSet::VersionSet(const std::string& dbname, new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, write_buffer_manager, write_controller, block_cache_tracer, io_tracer)), + table_cache_(table_cache), env_(_db_options->env), fs_(_db_options->fs, io_tracer), dbname_(dbname), @@ -3682,12 +3755,11 @@ VersionSet::VersionSet(const std::string& dbname, VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on // VersionSet - Cache* table_cache = column_family_set_->get_table_cache(); column_family_set_.reset(); for (auto& file : obsolete_files_) { if (file.metadata->table_reader_handle) { - table_cache->Release(file.metadata->table_reader_handle); - TableCache::Evict(table_cache, file.metadata->fd.GetNumber()); + table_cache_->Release(file.metadata->table_reader_handle); + TableCache::Evict(table_cache_, file.metadata->fd.GetNumber()); } file.DeleteMetadata(); } @@ -3697,11 +3769,10 @@ VersionSet::~VersionSet() { void VersionSet::Reset() { if (column_family_set_) { - Cache* table_cache = column_family_set_->get_table_cache(); WriteBufferManager* wbm = column_family_set_->write_buffer_manager(); WriteController* wc = column_family_set_->write_controller(); column_family_set_.reset( - new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache, + new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache_, wbm, wc, block_cache_tracer_, io_tracer_)); } db_id_.clear(); diff --git a/db/version_set.h b/db/version_set.h index 0d11163b89..eacfca44e3 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -29,6 +29,7 @@ #include #include +#include "cache/cache_helpers.h" #include "db/blob/blob_file_meta.h" #include "db/column_family.h" #include "db/compaction/compaction.h" @@ -776,6 +777,13 @@ class Version { return storage_info_.user_comparator_; } + // Interprets *value as a blob reference, and (assuming the corresponding + // blob file is part of this Version) retrieves the blob and saves it in + // *value, replacing the blob reference. + // REQUIRES: *value stores an encoded blob reference + Status GetBlob(const ReadOptions& read_options, const Slice& user_key, + PinnableSlice* value) const; + // Returns true if the filter blocks in the specified level will not be // checked during read operations. In certain cases (trivial move or preload), // the filter block may already be cached, but we still do not access it such @@ -800,6 +808,7 @@ class Version { Logger* info_log_; Statistics* db_statistics_; TableCache* table_cache_; + BlobFileCache* blob_file_cache_; const MergeOperator* merge_operator_; VersionStorageInfo storage_info_; @@ -1150,6 +1159,10 @@ class VersionSet { void GetLiveFilesMetaData(std::vector *metadata); void AddObsoleteBlobFile(uint64_t blob_file_number, std::string path) { + assert(table_cache_); + + table_cache_->Erase(GetSlice(&blob_file_number)); + obsolete_blob_files_.emplace_back(blob_file_number, std::move(path)); } @@ -1265,6 +1278,7 @@ class VersionSet { WalSet wals_; std::unique_ptr column_family_set_; + Cache* table_cache_; Env* const env_; FileSystemPtr const fs_; const std::string dbname_; diff --git a/src.mk b/src.mk index af9cb0914d..8ee3c68f3c 100644 --- a/src.mk +++ b/src.mk @@ -7,6 +7,7 @@ LIB_SOURCES = \ db/arena_wrapped_db_iter.cc \ db/blob/blob_file_addition.cc \ db/blob/blob_file_builder.cc \ + db/blob/blob_file_cache.cc \ db/blob/blob_file_garbage.cc \ db/blob/blob_file_meta.cc \ db/blob/blob_file_reader.cc \ @@ -356,8 +357,10 @@ TEST_MAIN_SOURCES = \ cache/lru_cache_test.cc \ db/blob/blob_file_addition_test.cc \ db/blob/blob_file_builder_test.cc \ + db/blob/blob_file_cache_test.cc \ db/blob/blob_file_garbage_test.cc \ db/blob/blob_file_reader_test.cc \ + db/blob/db_blob_basic_test.cc \ db/blob/db_blob_index_test.cc \ db/column_family_test.cc \ db/compact_files_test.cc \ diff --git a/table/get_context.cc b/table/get_context.cc index ecd59220a7..78901dca48 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -245,7 +245,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); if (type == kTypeBlobIndex && is_blob_index_ == nullptr) { // Blob value not supported. Stop. - state_ = kBlobIndex; + state_ = kUnexpectedBlobIndex; return false; } if (kNotFound == state_) { diff --git a/table/get_context.h b/table/get_context.h index c349a3e6ff..f330580db8 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -71,7 +71,7 @@ class GetContext { kDeleted, kCorrupt, kMerge, // saver contains the current merge result (the operands) - kBlobIndex, + kUnexpectedBlobIndex, }; GetContextStats get_context_stats_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 4be5ee8d5e..0190f76320 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -1046,7 +1046,7 @@ TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { if (data.count(key) == 0) { ASSERT_TRUE(s.IsNotFound()); } else if (is_blob[i]) { - ASSERT_TRUE(s.IsNotSupported()); + ASSERT_TRUE(s.IsCorruption()); } else { ASSERT_OK(s); ASSERT_EQ(data[key], value);