diff --git a/CMakeLists.txt b/CMakeLists.txt index b5fed942d3..31f9618dfb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -615,6 +615,7 @@ set(SOURCES db/blob/blob_log_format.cc db/blob/blob_log_sequential_reader.cc db/blob/blob_log_writer.cc + db/blob/blob_source.cc db/blob/prefetch_buffer_collection.cc db/builder.cc db/c.cc @@ -1217,6 +1218,7 @@ if(WITH_TESTS) db/blob/blob_file_garbage_test.cc db/blob/blob_file_reader_test.cc db/blob/blob_garbage_meter_test.cc + db/blob/blob_source_test.cc db/blob/db_blob_basic_test.cc db/blob/db_blob_compaction_test.cc db/blob/db_blob_corruption_test.cc diff --git a/Makefile b/Makefile index 278ad8c291..e005c61027 100644 --- a/Makefile +++ b/Makefile @@ -1863,6 +1863,9 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA blob_file_reader_test: $(OBJ_DIR)/db/blob/blob_file_reader_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +blob_source_test: $(OBJ_DIR)/db/blob/blob_source_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index a6556d901e..e15dc7e311 100644 --- a/TARGETS +++ b/TARGETS @@ -30,6 +30,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", + "db/blob/blob_source.cc", "db/blob/prefetch_buffer_collection.cc", "db/builder.cc", "db/c.cc", @@ -359,6 +360,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "db/blob/blob_log_format.cc", "db/blob/blob_log_sequential_reader.cc", "db/blob/blob_log_writer.cc", + "db/blob/blob_source.cc", "db/blob/prefetch_buffer_collection.cc", "db/builder.cc", "db/c.cc", @@ -4800,6 +4802,12 @@ cpp_unittest_wrapper(name="blob_garbage_meter_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="blob_source_test", + srcs=["db/blob/blob_source_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="block_based_table_reader_test", srcs=["table/block_based/block_based_table_reader_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc new file mode 100644 index 0000000000..d8e8cfb18e --- /dev/null +++ b/db/blob/blob_source.cc @@ -0,0 +1,169 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_source.h" + +#include +#include + +#include "db/blob/blob_file_reader.h" +#include "options/cf_options.h" + +namespace ROCKSDB_NAMESPACE { + +BlobSource::BlobSource(const ImmutableOptions* immutable_options, + const std::string& db_id, + const std::string& db_session_id, + BlobFileCache* blob_file_cache) + : db_id_(db_id), + db_session_id_(db_session_id), + statistics_(immutable_options->statistics.get()), + blob_file_cache_(blob_file_cache), + blob_cache_(immutable_options->blob_cache) {} + +BlobSource::~BlobSource() = default; + +Status BlobSource::GetBlobFromCache(const Slice& cache_key, + CachableEntry* blob) const { + assert(blob); + assert(blob->IsEmpty()); + assert(blob_cache_); + assert(!cache_key.empty()); + + Cache::Handle* cache_handle = nullptr; + cache_handle = GetEntryFromCache(cache_key); + if (cache_handle != nullptr) { + blob->SetCachedValue( + static_cast(blob_cache_->Value(cache_handle)), + blob_cache_.get(), cache_handle); + return Status::OK(); + } + + assert(blob->IsEmpty()); + + return Status::NotFound("Blob not found in cache"); +} + +Status BlobSource::PutBlobIntoCache(const Slice& cache_key, + CachableEntry* cached_blob, + PinnableSlice* blob) const { + assert(blob); + assert(!cache_key.empty()); + assert(blob_cache_); + + Status s; + const Cache::Priority priority = Cache::Priority::LOW; + + // Objects to be put into the cache have to be heap-allocated and + // self-contained, i.e. own their contents. The Cache has to be able to take + // unique ownership of them. Therefore, we copy the blob into a string + // directly, and insert that into the cache. + std::string* buf = new std::string(); + buf->assign(blob->data(), blob->size()); + + // TODO: support custom allocators and provide a better estimated memory + // usage using malloc_usable_size. + Cache::Handle* cache_handle = nullptr; + s = InsertEntryIntoCache(cache_key, buf, buf->size(), &cache_handle, + priority); + if (s.ok()) { + assert(cache_handle != nullptr); + cached_blob->SetCachedValue(buf, blob_cache_.get(), cache_handle); + } + + return s; +} + +Status BlobSource::GetBlob(const ReadOptions& read_options, + const Slice& user_key, uint64_t file_number, + uint64_t offset, uint64_t file_size, + uint64_t value_size, + CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, + PinnableSlice* value, uint64_t* bytes_read) { + assert(value); + + Status s; + + const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); + + CachableEntry blob_entry; + + // First, try to get the blob from the cache + // + // If blob cache is enabled, we'll try to read from it. + if (blob_cache_) { + Slice key = cache_key.AsSlice(); + s = GetBlobFromCache(key, &blob_entry); + if (s.ok() && blob_entry.GetValue()) { + assert(blob_entry.GetValue()->size() == value_size); + if (bytes_read) { + *bytes_read = value_size; + } + value->PinSelf(*blob_entry.GetValue()); + return s; + } + } + + assert(blob_entry.IsEmpty()); + + const bool no_io = read_options.read_tier == kBlockCacheTier; + if (no_io) { + return Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + } + + // Can't find the blob from the cache. Since I/O is allowed, read from the + // file. + { + CacheHandleGuard blob_file_reader; + s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); + if (!s.ok()) { + return s; + } + + assert(blob_file_reader.GetValue()); + + if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) { + return Status::Corruption("Compression type mismatch when reading blob"); + } + + s = blob_file_reader.GetValue()->GetBlob( + read_options, user_key, offset, value_size, compression_type, + prefetch_buffer, value, bytes_read); + if (!s.ok()) { + return s; + } + } + + if (blob_cache_ && read_options.fill_cache) { + // If filling cache is allowed and a cache is configured, try to put the + // blob to the cache. + Slice key = cache_key.AsSlice(); + s = PutBlobIntoCache(key, &blob_entry, value); + if (!s.ok()) { + return s; + } + } + + assert(s.ok()); + return s; +} + +bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, + uint64_t offset) const { + const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); + const Slice key = cache_key.AsSlice(); + + CachableEntry blob_entry; + const Status s = GetBlobFromCache(key, &blob_entry); + + if (s.ok() && blob_entry.GetValue() != nullptr) { + return true; + } + + return false; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h new file mode 100644 index 0000000000..ae4991e5fe --- /dev/null +++ b/db/blob/blob_source.h @@ -0,0 +1,88 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "cache/cache_helpers.h" +#include "cache/cache_key.h" +#include "db/blob/blob_file_cache.h" +#include "include/rocksdb/cache.h" +#include "rocksdb/rocksdb_namespace.h" +#include "table/block_based/cachable_entry.h" + +namespace ROCKSDB_NAMESPACE { + +struct ImmutableOptions; +class Status; +class FilePrefetchBuffer; +class Slice; + +// BlobSource is a class that provides universal access to blobs, regardless of +// whether they are in the blob cache, secondary cache, or (remote) storage. +// Depending on user settings, it always fetch blobs from multi-tier cache and +// storage with minimal cost. +class BlobSource { + public: + BlobSource(const ImmutableOptions* immutable_options, + const std::string& db_id, const std::string& db_session_id, + BlobFileCache* blob_file_cache); + + BlobSource(const BlobSource&) = delete; + BlobSource& operator=(const BlobSource&) = delete; + + ~BlobSource(); + + Status GetBlob(const ReadOptions& read_options, const Slice& user_key, + uint64_t file_number, uint64_t offset, uint64_t file_size, + uint64_t value_size, CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, + uint64_t* bytes_read); + + bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size, + uint64_t offset) const; + + private: + Status GetBlobFromCache(const Slice& cache_key, + CachableEntry* blob) const; + + Status PutBlobIntoCache(const Slice& cache_key, + CachableEntry* cached_blob, + PinnableSlice* blob) const; + + inline CacheKey GetCacheKey(uint64_t file_number, uint64_t file_size, + uint64_t offset) const { + OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number, + file_size); + return base_cache_key.WithOffset(offset); + } + + inline Cache::Handle* GetEntryFromCache(const Slice& key) const { + return blob_cache_->Lookup(key, statistics_); + } + + inline Status InsertEntryIntoCache(const Slice& key, std::string* value, + size_t charge, + Cache::Handle** cache_handle, + Cache::Priority priority) const { + return blob_cache_->Insert(key, value, charge, + &DeleteCacheEntry, cache_handle, + priority); + } + + const std::string db_id_; + const std::string db_session_id_; + + Statistics* statistics_; + + // A cache to store blob file reader. + BlobFileCache* blob_file_cache_; + + // A cache to store uncompressed blobs. + std::shared_ptr blob_cache_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc new file mode 100644 index 0000000000..55f451d779 --- /dev/null +++ b/db/blob/blob_source_test.cc @@ -0,0 +1,298 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_source.h" + +#include +#include +#include +#include +#include + +#include "db/blob/blob_file_cache.h" +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_writer.h" +#include "db/db_test_util.h" +#include "file/filename.h" +#include "file/read_write_util.h" +#include "options/cf_options.h" +#include "rocksdb/options.h" +#include "util/compression.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { + +// Creates a test blob file with `num` blobs in it. +void WriteBlobFile(const ImmutableOptions& immutable_options, + uint32_t column_family_id, bool has_ttl, + const ExpirationRange& expiration_range_header, + const ExpirationRange& expiration_range_footer, + uint64_t blob_file_number, const std::vector& keys, + const std::vector& blobs, CompressionType compression, + std::vector& blob_offsets, + std::vector& blob_sizes) { + assert(!immutable_options.cf_paths.empty()); + size_t num = keys.size(); + assert(num == blobs.size()); + assert(num == blob_offsets.size()); + assert(num == blob_sizes.size()); + + const std::string blob_file_path = + BlobFileName(immutable_options.cf_paths.front().path, blob_file_number); + std::unique_ptr file; + ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file, + FileOptions())); + + std::unique_ptr file_writer(new WritableFileWriter( + std::move(file), blob_file_path, FileOptions(), immutable_options.clock)); + + constexpr Statistics* statistics = nullptr; + constexpr bool use_fsync = false; + constexpr bool do_flush = false; + + BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock, + statistics, blob_file_number, use_fsync, + do_flush); + + BlobLogHeader header(column_family_id, compression, has_ttl, + expiration_range_header); + + ASSERT_OK(blob_log_writer.WriteHeader(header)); + + std::vector compressed_blobs(num); + std::vector blobs_to_write(num); + if (kNoCompression == compression) { + for (size_t i = 0; i < num; ++i) { + blobs_to_write[i] = blobs[i]; + blob_sizes[i] = blobs[i].size(); + } + } else { + CompressionOptions opts; + CompressionContext context(compression); + constexpr uint64_t sample_for_compression = 0; + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), + compression, sample_for_compression); + + constexpr uint32_t compression_format_version = 2; + + for (size_t i = 0; i < num; ++i) { + ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version, + &compressed_blobs[i])); + blobs_to_write[i] = compressed_blobs[i]; + blob_sizes[i] = compressed_blobs[i].size(); + } + } + + for (size_t i = 0; i < num; ++i) { + uint64_t key_offset = 0; + ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset, + &blob_offsets[i])); + } + + BlobLogFooter footer; + footer.blob_count = num; + footer.expiration_range = expiration_range_footer; + + std::string checksum_method; + std::string checksum_value; + ASSERT_OK( + blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value)); +} + +} // anonymous namespace + +class BlobSourceTest : public DBTestBase { + protected: + public: + explicit BlobSourceTest() + : DBTestBase("blob_source_test", /*env_do_fsync=*/true) {} +}; + +TEST_F(BlobSourceTest, GetBlobsFromCache) { + Options options; + options.env = env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0); + options.enable_blob_files = true; + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + options.blob_cache = NewLRUCache(co); + options.lowest_used_cache_tier = CacheTier::kVolatileTier; + + Reopen(options); + + std::string db_id; + ASSERT_OK(db_->GetDbIdentity(db_id)); + + std::string db_session_id; + ASSERT_OK(db_->GetDbSessionId(db_session_id)); + + ImmutableOptions immutable_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr size_t num_blobs = 16; + + std::vector key_strs; + std::vector blob_strs; + + for (size_t i = 0; i < num_blobs; ++i) { + key_strs.push_back("key" + std::to_string(i)); + blob_strs.push_back("blob" + std::to_string(i)); + } + + std::vector keys; + std::vector blobs; + + uint64_t file_size = BlobLogHeader::kSize; + for (size_t i = 0; i < num_blobs; ++i) { + keys.push_back({key_strs[i]}); + blobs.push_back({blob_strs[i]}); + file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); + } + file_size += BlobLogFooter::kSize; + + std::vector blob_offsets(keys.size()); + std::vector blob_sizes(keys.size()); + + WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, + expiration_range, blob_file_number, keys, blobs, kNoCompression, + blob_offsets, blob_sizes); + + constexpr size_t capacity = 1024; + std::shared_ptr backing_cache = + NewLRUCache(capacity); // Blob file cache + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr blob_file_cache(new BlobFileCache( + backing_cache.get(), &immutable_options, &file_options, column_family_id, + blob_file_read_hist, nullptr /*IOTracer*/)); + + BlobSource blob_source(&immutable_options, db_id, db_session_id, + blob_file_cache.get()); + + ReadOptions read_options; + read_options.verify_checksums = true; + + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + + { + // GetBlob + std::vector values(keys.size()); + uint64_t bytes_read = 0; + + read_options.fill_cache = false; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read)); + ASSERT_EQ(values[i], blobs[i]); + ASSERT_EQ(bytes_read, + blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + read_options.fill_cache = true; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read)); + ASSERT_EQ(values[i], blobs[i]); + ASSERT_EQ(bytes_read, + blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + read_options.fill_cache = true; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read)); + ASSERT_EQ(values[i], blobs[i]); + ASSERT_EQ(bytes_read, blob_sizes[i]); + + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + // Cache-only GetBlob + read_options.read_tier = ReadTier::kBlockCacheTier; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read)); + ASSERT_EQ(values[i], blobs[i]); + ASSERT_EQ(bytes_read, blob_sizes[i]); + + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } + + options.blob_cache->EraseUnRefEntries(); + + { + // Cache-only GetBlob + std::vector values(keys.size()); + uint64_t bytes_read = 0; + + read_options.read_tier = ReadTier::kBlockCacheTier; + read_options.fill_cache = true; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + + ASSERT_NOK(blob_source.GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, + &values[i], &bytes_read)); + ASSERT_TRUE(values[i].empty()); + ASSERT_EQ(bytes_read, 0); + + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 6ec2e33320..48093811e1 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -290,7 +290,7 @@ class Cache { virtual const char* Name() const = 0; // Insert a mapping from key->value into the volatile cache only - // and assign it // the specified charge against the total cache capacity. + // and assign it with the specified charge against the total cache capacity. // If strict_capacity_limit is true and cache reaches its full capacity, // return Status::Incomplete. // @@ -394,8 +394,8 @@ class Cache { // memory - call this only if you're shutting down the process. // Any attempts of using cache after this call will fail terribly. // Always delete the DB object before calling this method! - virtual void DisownData(){ - // default implementation is noop + virtual void DisownData() { + // default implementation is noop } struct ApplyToAllEntriesOptions { diff --git a/src.mk b/src.mk index d8cf582962..c2bd53dbb3 100644 --- a/src.mk +++ b/src.mk @@ -21,6 +21,7 @@ LIB_SOURCES = \ db/blob/blob_log_format.cc \ db/blob/blob_log_sequential_reader.cc \ db/blob/blob_log_writer.cc \ + db/blob/blob_source.cc \ db/blob/prefetch_buffer_collection.cc \ db/builder.cc \ db/c.cc \ @@ -419,6 +420,7 @@ TEST_MAIN_SOURCES = \ db/blob/blob_file_garbage_test.cc \ db/blob/blob_file_reader_test.cc \ db/blob/blob_garbage_meter_test.cc \ + db/blob/blob_source_test.cc \ db/blob/db_blob_basic_test.cc \ db/blob/db_blob_compaction_test.cc \ db/blob/db_blob_corruption_test.cc \