From eaaef9117820c2cc9bda1ccc8c53cc499f831055 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 17 Oct 2017 17:24:25 -0700 Subject: [PATCH] Blob DB: Store blob index as kTypeBlobIndex in base db Summary: Blob db insert blob index to base db as kTypeBlobIndex type, to tell apart values written by plain rocksdb or blob db. This is to make it possible to migrate from existing rocksdb to blob db. Also with the patch blob db garbage collection get away from OptimisticTransaction. Instead it use a custom write callback to achieve similar behavior as OptimisticTransaction. This is because we need to pass the is_blob_index flag to DBImpl::Get but OptimisticTransaction don't support it. Closes https://github.com/facebook/rocksdb/pull/3000 Differential Revision: D6050044 Pulled By: yiwu-arbug fbshipit-source-id: 61dc72ab9977625e75f78cd968e7d8a3976e3632 --- db/db_impl.cc | 12 +- db/db_impl.h | 18 +- db/memtable_list.cc | 13 +- db/memtable_list.h | 8 +- utilities/blob_db/blob_db_impl.cc | 304 +++++++++++++++------------ utilities/blob_db/blob_db_impl.h | 71 ++----- utilities/blob_db/blob_db_iterator.h | 104 +++++++++ utilities/blob_db/blob_db_test.cc | 89 ++++++-- 8 files changed, 378 insertions(+), 241 deletions(-) create mode 100644 utilities/blob_db/blob_db_iterator.h diff --git a/db/db_impl.cc b/db/db_impl.cc index 8c8ababb6d..a061da8353 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2556,7 +2556,8 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, #ifndef ROCKSDB_LITE Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool cache_only, SequenceNumber* seq, - bool* found_record_for_key) { + bool* found_record_for_key, + bool* is_blob_index) { Status s; MergeContext merge_context; RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), @@ -2571,7 +2572,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check if there is a record for this key in the latest memtable sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, - read_options); + read_options, nullptr /*read_callback*/, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2590,7 +2591,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check if there is a record for this key in the immutable memtables sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, - read_options); + read_options, nullptr /*read_callback*/, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2609,7 +2610,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check if there is a record for this key in the immutable memtables sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg, - seq, read_options); + seq, read_options, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2633,7 +2634,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check tables sv->current->Get(read_options, lkey, nullptr, &s, &merge_context, &range_del_agg, nullptr /* value_found */, - found_record_for_key, seq); + found_record_for_key, seq, nullptr /*read_callback*/, + is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading SST files diff --git a/db/db_impl.h b/db/db_impl.h index 30866fba18..99abe2d349 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -96,6 +96,14 @@ class DBImpl : public DB { virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + + // Function that Get and KeyMayExist call with no_io true or false + // Note: 'value_found' from KeyMayExist propagates here + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value, + bool* value_found = nullptr, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -295,7 +303,8 @@ class DBImpl : public DB { // TODO(andrewkr): this API need to be aware of range deletion operations Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool cache_only, SequenceNumber* seq, - bool* found_record_for_key); + bool* found_record_for_key, + bool* is_blob_index = nullptr); using DB::IngestExternalFile; virtual Status IngestExternalFile( @@ -1272,13 +1281,6 @@ class DBImpl : public DB { #endif // ROCKSDB_LITE - // Function that Get and KeyMayExist call with no_io true or false - // Note: 'value_found' from KeyMayExist propagates here - Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, PinnableSlice* value, - bool* value_found = nullptr, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); - bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, bool is_locked, uint64_t* value); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 93bc192814..d8eb1fefc0 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -109,14 +109,13 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, seq, read_opts, callback, is_blob_index); } -bool MemTableListVersion::GetFromHistory(const LookupKey& key, - std::string* value, Status* s, - MergeContext* merge_context, - RangeDelAggregator* range_del_agg, - SequenceNumber* seq, - const ReadOptions& read_opts) { +bool MemTableListVersion::GetFromHistory( + const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) { return GetFromList(&memlist_history_, key, value, s, merge_context, - range_del_agg, seq, read_opts); + range_del_agg, seq, read_opts, nullptr /*read_callback*/, + is_blob_index); } bool MemTableListVersion::GetFromList( diff --git a/db/memtable_list.h b/db/memtable_list.h index 52833a2452..ca0fd9d77b 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -73,14 +73,16 @@ class MemTableListVersion { bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts); + const ReadOptions& read_opts, + bool* is_blob_index = nullptr); bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts) { + const ReadOptions& read_opts, + bool* is_blob_index = nullptr) { SequenceNumber seq; return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq, - read_opts); + read_opts, is_blob_index); } Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index f6c6dc6e2b..aefa2598b9 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -32,8 +32,7 @@ #include "util/random.h" #include "util/sync_point.h" #include "util/timer_queue.h" -#include "utilities/transactions/optimistic_transaction.h" -#include "utilities/transactions/optimistic_transaction_db_impl.h" +#include "utilities/blob_db/blob_db_iterator.h" namespace { int kBlockBasedTableVersionFormat = 2; @@ -78,7 +77,7 @@ class BlobHandle { void EncodeTo(std::string* dst) const; - Status DecodeFrom(Slice* input); + Status DecodeFrom(const Slice& input); void clear(); @@ -109,10 +108,12 @@ void BlobHandle::clear() { compression_ = kNoCompression; } -Status BlobHandle::DecodeFrom(Slice* input) { - if (GetVarint64(input, &file_number_) && GetVarint64(input, &offset_) && - GetVarint64(input, &size_)) { - compression_ = static_cast(input->data()[0]); +Status BlobHandle::DecodeFrom(const Slice& input) { + Slice s(input); + Slice* p = &s; + if (GetVarint64(p, &file_number_) && GetVarint64(p, &offset_) && + GetVarint64(p, &size_)) { + compression_ = static_cast(p->data()[0]); return Status::OK(); } else { clear(); @@ -149,8 +150,7 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction( value_type == CompactionEventListener::CompactionListenerValueType::kValue) { BlobHandle handle; - Slice lsmval(existing_value); - Status s = handle.DecodeFrom(&lsmval); + Status s = handle.DecodeFrom(existing_value); if (s.ok()) { if (impl_->debug_level_ >= 3) ROCKS_LOG_INFO(impl_->db_options_.info_log, @@ -211,8 +211,6 @@ Status BlobDBImpl::LinkToBaseDB(DB* db) { env_ = db_->GetEnv(); - opt_db_.reset(new OptimisticTransactionDBImpl(db, false)); - Status s = env_->CreateDirIfMissing(blob_dir_); if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, @@ -237,7 +235,6 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options) : BlobDB(db), db_impl_(static_cast_with_check(db)), - opt_db_(new OptimisticTransactionDBImpl(db, false)), wo_set_(false), bdb_options_(blob_db_options), db_options_(db->GetOptions()), @@ -827,8 +824,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { extendTTL(&(bfile->ttl_range_), expiration); } - return WriteBatchInternal::Put(&updates_blob_, column_family_id, key, - index_entry); + return WriteBatchInternal::PutBlobIndex(&updates_blob_, column_family_id, + key, index_entry); } virtual Status DeleteCF(uint32_t column_family_id, @@ -997,18 +994,6 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, std::string headerbuf; Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); - // this is another more safer way to do it, where you keep the writeLock - // for the entire write path. this will increase latency and reduce - // throughput - // WriteLock lockbfile_w(&bfile->mutex_); - // std::shared_ptr writer = - // CheckOrCreateWriterLocked(bfile); - - if (debug_level_ >= 3) - ROCKS_LOG_DEBUG( - db_options_.info_log, ">Adding KEY FILE: %s: KEY: %s VALSZ: %d", - bfile->PathName().c_str(), key.ToString().c_str(), value.size()); - std::string index_entry; Status s = AppendBlob(bfile, headerbuf, key, value, &index_entry); if (!s.ok()) { @@ -1022,20 +1007,25 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, } WriteBatch batch; - batch.Put(key, index_entry); + uint32_t column_family_id = + reinterpret_cast(DefaultColumnFamily())->GetID(); + s = WriteBatchInternal::PutBlobIndex(&batch, column_family_id, key, + index_entry); // this goes to the base db and can be expensive - s = db_->Write(options, &batch); - - // this is the sequence number of the write. - SequenceNumber sn = WriteBatchInternal::Sequence(&batch); - bfile->ExtendSequenceRange(sn); - - if (expiration != kNoExpiration) { - extendTTL(&(bfile->ttl_range_), expiration); + if (s.ok()) { + s = db_->Write(options, &batch); } if (s.ok()) { + // this is the sequence number of the write. + SequenceNumber sn = WriteBatchInternal::Sequence(&batch); + bfile->ExtendSequenceRange(sn); + + if (expiration != kNoExpiration) { + extendTTL(&(bfile->ttl_range_), expiration); + } + s = CloseBlobFileIfNeeded(bfile); } @@ -1112,21 +1102,16 @@ std::vector BlobDBImpl::MultiGet( // fetch and index entry and reading from the file. ReadOptions ro(read_options); bool snapshot_created = SetSnapshotIfNeeded(&ro); - std::vector values_lsm; - values_lsm.resize(keys.size()); - auto statuses = db_->MultiGet(ro, keys, &values_lsm); - TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1"); - TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2"); - values->resize(keys.size()); - assert(statuses.size() == keys.size()); - assert(values_lsm.size() == keys.size()); - for (size_t i = 0; i < keys.size(); ++i) { - if (!statuses[i].ok()) { - continue; - } - Status s = CommonGet(keys[i], values_lsm[i], &((*values)[i])); - statuses[i] = s; + std::vector statuses; + statuses.reserve(keys.size()); + values->clear(); + values->reserve(keys.size()); + PinnableSlice value; + for (size_t i = 0; i < keys.size(); i++) { + statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value)); + values->push_back(value.ToString()); + value.Reset(); } if (snapshot_created) { db_->ReleaseSnapshot(ro.snapshot); @@ -1143,12 +1128,11 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { return true; } -Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, - std::string* value) { +Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value) { assert(value != nullptr); - Slice index_entry_slice(index_entry); BlobHandle handle; - Status s = handle.DecodeFrom(&index_entry_slice); + Status s = handle.DecodeFrom(index_entry); if (!s.ok()) return s; // offset has to have certain min, as we will read CRC @@ -1179,9 +1163,8 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, bfile = hitr->second; } - // 0 - size - if (!handle.size() && value != nullptr) { - value->clear(); + if (handle.size() == 0 && value != nullptr) { + value->PinSelf(""); return Status::OK(); } @@ -1189,7 +1172,7 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, std::shared_ptr reader = GetOrOpenRandomAccessReader(bfile, env_, env_options_); - std::string* valueptr = value; + std::string* valueptr = value->GetSelf(); std::string value_c; if (bdb_options_.compression != kNoCompression) { valueptr = &value_c; @@ -1251,9 +1234,11 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, blob_value.data(), blob_value.size(), &contents, kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, *(cfh->cfd()->ioptions())); - *value = contents.data.ToString(); + *(value->GetSelf()) = contents.data.ToString(); } + value->PinSelf(); + return s; } @@ -1271,13 +1256,16 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, bool snapshot_created = SetSnapshotIfNeeded(&ro); Status s; - std::string index_entry; - s = db_->Get(ro, key, &index_entry); + bool is_blob_index = false; + s = db_impl_->GetImpl(ro, column_family, key, value, nullptr /*value_found*/, + nullptr /*read_callback*/, &is_blob_index); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); if (s.ok()) { - s = CommonGet(key, index_entry, value->GetSelf()); - value->PinSelf(); + if (is_blob_index) { + PinnableSlice index_entry = std::move(*value); + s = GetBlobValue(key, index_entry, value); + } } if (snapshot_created) { db_->ReleaseSnapshot(ro.snapshot); @@ -1285,15 +1273,6 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, return s; } -Slice BlobDBIterator::value() const { - TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1"); - TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2"); - Slice index_entry = iter_->value(); - Status s = - db_impl_->CommonGet(iter_->key(), index_entry.ToString(false), &vpart_); - return Slice(vpart_); -} - std::pair BlobDBImpl::SanityCheck(bool aborted) { if (aborted) return std::make_pair(false, -1); @@ -1411,14 +1390,13 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, return true; } -bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& lsmValue) { - Slice val(lsmValue); +bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) { BlobHandle handle; - Status s = handle.DecodeFrom(&val); + Status s = handle.DecodeFrom(index_entry); if (!s.ok()) { ROCKS_LOG_INFO(db_options_.info_log, "Could not parse lsm val in MarkBlobDeleted %s", - lsmValue.ToString().c_str()); + index_entry.ToString().c_str()); return false; } bool succ = FindFileAndEvictABlob(handle.filenumber(), key.size(), @@ -1618,7 +1596,52 @@ std::pair BlobDBImpl::WaStats(bool aborted) { return std::make_pair(true, -1); } -//////////////////////////////////////////////////////////////////////////////// +// Write callback for garbage collection to check if key has been updated +// since last read. Similar to how OptimisticTransaction works. See inline +// comment in GCFileAndUpdateLSM(). +class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback { + public: + GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key, + SequenceNumber upper_bound) + : cfd_(cfd), key_(key), upper_bound_(upper_bound) {} + + virtual Status Callback(DB* db) override { + auto* db_impl = reinterpret_cast(db); + auto* sv = db_impl->GetAndRefSuperVersion(cfd_); + SequenceNumber latest_seq = 0; + bool found_record_for_key = false; + bool is_blob_index = false; + Status s = db_impl->GetLatestSequenceForKey( + sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key, + &is_blob_index); + db_impl->ReturnAndCleanupSuperVersion(cfd_, sv); + if (!s.ok() && !s.IsNotFound()) { + // Error. + assert(!s.IsBusy()); + return s; + } + if (s.IsNotFound()) { + assert(!found_record_for_key); + return Status::Busy("Key deleted"); + } + assert(found_record_for_key); + assert(is_blob_index); + if (latest_seq > upper_bound_) { + return Status::Busy("Key overwritten"); + } + return s; + } + + virtual bool AllowWriteBatching() override { return false; } + + private: + ColumnFamilyData* cfd_; + // Key to check + Slice key_; + // Upper bound of sequence number to proceed. + SequenceNumber upper_bound_; +}; + // iterate over the blobs sequentially and check if the blob sequence number // is the latest. If it is the latest, preserve it, otherwise delete it // if it is TTL based, and the TTL has expired, then @@ -1631,7 +1654,6 @@ std::pair BlobDBImpl::WaStats(bool aborted) { // // if it is not TTL based, then we can blow the key if the key has been // DELETED in the LSM -//////////////////////////////////////////////////////////////////////////////// Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, GCStats* gc_stats) { uint64_t now = EpochNow(); @@ -1656,14 +1678,14 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, bool first_gc = bfptr->gc_once_after_open_; - ColumnFamilyHandle* cfh = bfptr->GetColumnFamily(db_); + auto* cfh = bfptr->GetColumnFamily(db_); + auto* cfd = reinterpret_cast(cfh)->cfd(); + auto column_family_id = cfd->GetID(); bool has_ttl = header.HasTTL(); // this reads the key but skips the blob Reader::ReadLevel shallow = Reader::kReadHeaderKey; - assert(opt_db_); - bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second); bool no_relocation_lsmdel = false; @@ -1683,59 +1705,52 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, BlobLogRecord record; std::shared_ptr newfile; std::shared_ptr new_writer; - Transaction* transaction = nullptr; uint64_t blob_offset = 0; - bool retry = false; - - static const WriteOptions kGarbageCollectionWriteOptions = []() { - WriteOptions write_options; - // It is ok to ignore column families that were dropped. - write_options.ignore_missing_column_families = true; - return write_options; - }(); while (true) { assert(s.ok()); - if (retry) { - // Retry in case transaction fail with Status::TryAgain. - retry = false; - } else { - // Read the next blob record. - Status read_record_status = - reader->ReadRecord(&record, shallow, &blob_offset); - // Exit if we reach the end of blob file. - // TODO(yiwu): properly handle ReadRecord error. - if (!read_record_status.ok()) { - break; - } - gc_stats->blob_count++; - } - transaction = - opt_db_->BeginTransaction(kGarbageCollectionWriteOptions, - OptimisticTransactionOptions(), transaction); - - std::string index_entry; - Status get_status = transaction->GetForUpdate(ReadOptions(), cfh, - record.Key(), &index_entry); - TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate"); - if (get_status.IsNotFound()) { - // Key has been deleted. Drop the blob record. - continue; + // Read the next blob record. + Status read_record_status = + reader->ReadRecord(&record, shallow, &blob_offset); + // Exit if we reach the end of blob file. + // TODO(yiwu): properly handle ReadRecord error. + if (!read_record_status.ok()) { + break; } - if (!get_status.ok()) { + gc_stats->blob_count++; + + // Similar to OptimisticTransaction, we obtain latest_seq from + // base DB, which is guaranteed to be no smaller than the sequence of + // current key. We use a WriteCallback on write to check the key sequence + // on write. If the key sequence is larger than latest_seq, we know + // a new versions is inserted and the old blob can be disgard. + // + // We cannot use OptimisticTransaction because we need to pass + // is_blob_index flag to GetImpl. + SequenceNumber latest_seq = GetLatestSequenceNumber(); + bool is_blob_index = false; + PinnableSlice index_entry; + Status get_status = db_impl_->GetImpl( + ReadOptions(), cfh, record.Key(), &index_entry, nullptr /*value_found*/, + nullptr /*read_callback*/, &is_blob_index); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); + if (!get_status.ok() && !get_status.ok()) { + // error s = get_status; ROCKS_LOG_ERROR(db_options_.info_log, "Error while getting index entry: %s", s.ToString().c_str()); break; } + if (get_status.IsNotFound() || !is_blob_index) { + // Either the key is deleted or updated with a newer version whish is + // inlined in LSM. + continue; + } - // TODO(yiwu): We should have an override of GetForUpdate returning a - // PinnableSlice. - Slice index_entry_slice(index_entry); BlobHandle handle; - s = handle.DecodeFrom(&index_entry_slice); + s = handle.DecodeFrom(index_entry); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Error while decoding index entry: %s", @@ -1748,21 +1763,24 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, continue; } + GarbageCollectionWriteCallback callback(cfd, record.Key(), latest_seq); + // If key has expired, remove it from base DB. if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) { gc_stats->num_deletes++; gc_stats->deleted_size += record.GetBlobSize(); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); - transaction->Delete(cfh, record.Key()); - Status delete_status = transaction->Commit(); + WriteBatch delete_batch; + Status delete_status = delete_batch.Delete(record.Key()); + if (delete_status.ok()) { + delete_status = db_impl_->WriteWithCallback(WriteOptions(), + &delete_batch, &callback); + } if (delete_status.ok()) { gc_stats->delete_succeeded++; } else if (delete_status.IsBusy()) { // The key is overwritten in the meanwhile. Drop the blob record. gc_stats->overwritten_while_delete++; - } else if (delete_status.IsTryAgain()) { - // Retry the transaction. - retry = true; } else { // We hit an error. s = delete_status; @@ -1829,29 +1847,27 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, BlobLogRecord::kHeaderSize + record.Key().size() + record.Blob().size(); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); - transaction->Put(cfh, record.Key(), new_index_entry); - Status put_status = transaction->Commit(); - if (put_status.ok()) { + WriteBatch rewrite_batch; + Status rewrite_status = WriteBatchInternal::PutBlobIndex( + &rewrite_batch, column_family_id, record.Key(), new_index_entry); + if (rewrite_status.ok()) { + rewrite_status = db_impl_->WriteWithCallback(WriteOptions(), + &rewrite_batch, &callback); + } + if (rewrite_status.ok()) { gc_stats->relocate_succeeded++; - } else if (put_status.IsBusy()) { + } else if (rewrite_status.IsBusy()) { // The key is overwritten in the meanwhile. Drop the blob record. gc_stats->overwritten_while_relocate++; - } else if (put_status.IsTryAgain()) { - // Retry the transaction. - // TODO(yiwu): On retry, we can reuse the new blob record. - retry = true; } else { // We hit an error. - s = put_status; + s = rewrite_status; ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s", s.ToString().c_str()); break; } } // end of ReadRecord loop - if (transaction != nullptr) { - delete transaction; - } ROCKS_LOG_INFO( db_options_.info_log, "%s blob file %" PRIu64 @@ -2195,12 +2211,20 @@ std::pair BlobDBImpl::RunGC(bool aborted) { } Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { + auto* cfd = + reinterpret_cast(DefaultColumnFamily())->cfd(); // Get a snapshot to avoid blob file get deleted between we // fetch and index entry and reading from the file. - ReadOptions ro(read_options); - bool snapshot_created = SetSnapshotIfNeeded(&ro); - return new BlobDBIterator(db_->NewIterator(ro), this, snapshot_created, - ro.snapshot); + ManagedSnapshot* own_snapshot = nullptr; + const Snapshot* snapshot = read_options.snapshot; + if (snapshot == nullptr) { + own_snapshot = new ManagedSnapshot(db_); + snapshot = own_snapshot->snapshot(); + } + auto* iter = db_impl_->NewIteratorImpl( + read_options, cfd, snapshot->GetSequenceNumber(), + nullptr /*read_callback*/, true /*allow_blob*/); + return new BlobDBIterator(own_snapshot, iter, this); } Status DestroyBlobDB(const std::string& dbname, const Options& options, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 5654d05e56..6496c585d3 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -18,6 +18,7 @@ #include #include +#include "db/db_iter.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/listener.h" @@ -37,7 +38,6 @@ namespace rocksdb { class DBImpl; class ColumnFamilyHandle; class ColumnFamilyData; -class OptimisticTransactionDBImpl; struct FlushJobInfo; namespace blob_db { @@ -215,9 +215,20 @@ class BlobDBImpl : public BlobDB { Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + Status GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value); + using BlobDB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& read_options) override; + using BlobDB::NewIterators; + virtual Status NewIterators( + const ReadOptions& read_options, + const std::vector& column_families, + std::vector* iterators) override { + return Status::NotSupported("Not implemented"); + } + using BlobDB::MultiGet; virtual std::vector MultiGet( const ReadOptions& read_options, @@ -269,15 +280,14 @@ class BlobDBImpl : public BlobDB { #endif // !NDEBUG private: + class GarbageCollectionWriteCallback; + Status OpenPhase1(); // Create a snapshot if there isn't one in read options. // Return true if a snapshot is created. bool SetSnapshotIfNeeded(ReadOptions* read_options); - Status CommonGet(const Slice& key, const std::string& index_entry, - std::string* value); - Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const; @@ -416,10 +426,6 @@ class BlobDBImpl : public BlobDB { Env* env_; TTLExtractor* ttl_extractor_; - // Optimistic Transaction DB used during Garbage collection - // for atomicity - std::unique_ptr opt_db_; - // a boolean to capture whether write_options has been set std::atomic wo_set_; WriteOptions write_options_; @@ -527,55 +533,6 @@ class BlobDBImpl : public BlobDB { uint32_t debug_level_; }; -class BlobDBIterator : public Iterator { - public: - explicit BlobDBIterator(Iterator* iter, BlobDBImpl* impl, bool own_snapshot, - const Snapshot* snapshot) - : iter_(iter), - db_impl_(impl), - own_snapshot_(own_snapshot), - snapshot_(snapshot) { - assert(iter != nullptr); - assert(snapshot != nullptr); - } - - ~BlobDBIterator() { - if (own_snapshot_) { - db_impl_->ReleaseSnapshot(snapshot_); - } - delete iter_; - } - - bool Valid() const override { return iter_->Valid(); } - - void SeekToFirst() override { iter_->SeekToFirst(); } - - void SeekToLast() override { iter_->SeekToLast(); } - - void Seek(const Slice& target) override { iter_->Seek(target); } - - void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); } - - void Next() override { iter_->Next(); } - - void Prev() override { iter_->Prev(); } - - Slice key() const override { return iter_->key(); } - - Slice value() const override; - - Status status() const override { return iter_->status(); } - - // Iterator::Refresh() not supported. - - private: - Iterator* iter_; - BlobDBImpl* db_impl_; - bool own_snapshot_; - const Snapshot* snapshot_; - mutable std::string vpart_; -}; - } // namespace blob_db } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h new file mode 100644 index 0000000000..c8aa1ff17e --- /dev/null +++ b/utilities/blob_db/blob_db_iterator.h @@ -0,0 +1,104 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/iterator.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { +namespace blob_db { + +using rocksdb::ManagedSnapshot; + +class BlobDBIterator : public Iterator { + public: + BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter, + BlobDBImpl* blob_db) + : snapshot_(snapshot), iter_(iter), blob_db_(blob_db) {} + + virtual ~BlobDBIterator() = default; + + bool Valid() const override { + if (!iter_->Valid()) { + return false; + } + return status_.ok(); + } + + Status status() const override { + if (!iter_->status().ok()) { + return iter_->status(); + } + return status_; + } + + void SeekToFirst() override { + iter_->SeekToFirst(); + UpdateBlobValue(); + } + + void SeekToLast() override { + iter_->SeekToLast(); + UpdateBlobValue(); + } + + void Seek(const Slice& target) override { + iter_->Seek(target); + UpdateBlobValue(); + } + + void SeekForPrev(const Slice& target) override { + iter_->SeekForPrev(target); + UpdateBlobValue(); + } + + void Next() override { + assert(Valid()); + iter_->Next(); + UpdateBlobValue(); + } + + void Prev() override { + assert(Valid()); + iter_->Prev(); + UpdateBlobValue(); + } + + Slice key() const override { + assert(Valid()); + return iter_->key(); + } + + Slice value() const override { + assert(Valid()); + if (!iter_->IsBlob()) { + return iter_->value(); + } + return value_; + } + + // Iterator::Refresh() not supported. + + private: + void UpdateBlobValue() { + TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); + TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); + value_.Reset(); + if (iter_->Valid() && iter_->IsBlob()) { + status_ = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + } + } + + std::unique_ptr snapshot_; + std::unique_ptr iter_; + BlobDBImpl* blob_db_; + Status status_; + PinnableSlice value_; +}; +} // namespace blob_db +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index a31e90e0c6..535b86a6e9 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -88,9 +88,14 @@ class BlobDBTest : public testing::Test { void PutRandom(const std::string &key, Random *rnd, std::map *data = nullptr) { + PutRandom(blob_db_, key, rnd, data); + } + + void PutRandom(DB *db, const std::string &key, Random *rnd, + std::map *data = nullptr) { int len = rnd->Next() % kMaxBlobSize + 1; std::string value = test::RandomHumanReadableString(rnd, len); - ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value))); + ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); if (data != nullptr) { (*data)[key] = value; } @@ -116,9 +121,12 @@ class BlobDBTest : public testing::Test { } // Verify blob db contain expected data and nothing more. - // TODO(yiwu): Verify blob files are consistent with data in LSM. void VerifyDB(const std::map &data) { - Iterator *iter = blob_db_->NewIterator(ReadOptions()); + VerifyDB(blob_db_, data); + } + + void VerifyDB(DB *db, const std::map &data) { + Iterator *iter = db->NewIterator(ReadOptions()); iter->SeekToFirst(); for (auto &p : data) { ASSERT_TRUE(iter->Valid()); @@ -593,7 +601,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", "BlobDBImpl::PutUntil:Start"}, {"BlobDBImpl::PutUntil:Finish", "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}}); @@ -630,7 +638,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { mock_env_->set_now_micros(300 * 1000000); SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", "BlobDBImpl::PutUntil:Start"}, {"BlobDBImpl::PutUntil:Finish", "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}}); @@ -687,7 +695,7 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) { TEST_F(BlobDBTest, ReadWhileGC) { // run the same test for Get(), MultiGet() and Iterator each. - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 2; i++) { BlobDBOptions bdb_options; bdb_options.disable_background_tasks = true; Open(bdb_options); @@ -710,17 +718,10 @@ TEST_F(BlobDBTest, ReadWhileGC) { break; case 1: SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::MultiGet:AfterIndexEntryGet:1", + {{"BlobDBIterator::UpdateBlobValue:Start:1", "BlobDBTest::ReadWhileGC:1"}, {"BlobDBTest::ReadWhileGC:2", - "BlobDBImpl::MultiGet:AfterIndexEntryGet:2"}}); - break; - case 2: - SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBIterator::value:BeforeGetBlob:1", - "BlobDBTest::ReadWhileGC:1"}, - {"BlobDBTest::ReadWhileGC:2", - "BlobDBIterator::value:BeforeGetBlob:2"}}); + "BlobDBIterator::UpdateBlobValue:Start:2"}}); break; } SyncPoint::GetInstance()->EnableProcessing(); @@ -735,12 +736,6 @@ TEST_F(BlobDBTest, ReadWhileGC) { ASSERT_EQ("bar", value); break; case 1: - statuses = blob_db_->MultiGet(ReadOptions(), {"foo"}, &values); - ASSERT_EQ(1, statuses.size()); - ASSERT_EQ(1, values.size()); - ASSERT_EQ("bar", values[0]); - break; - case 2: // VerifyDB use iterator to scan the DB. VerifyDB({{"foo", "bar"}}); break; @@ -834,6 +829,58 @@ TEST_F(BlobDBTest, GetLiveFilesMetaData) { VerifyDB(data); } +TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { + constexpr size_t kNumKey = 20; + constexpr size_t kNumIteration = 10; + Random rnd(301); + std::map data; + std::vector is_blob(kNumKey, false); + + // Write to plain rocksdb. + Options options; + options.create_if_missing = true; + DB *db = nullptr; + ASSERT_OK(DB::Open(options, dbname_, &db)); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + ToString(key_index); + PutRandom(db, key, &rnd, &data); + } + VerifyDB(db, data); + delete db; + db = nullptr; + + // Open as blob db. Verify it can read existing data. + Open(); + VerifyDB(blob_db_, data); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + ToString(key_index); + is_blob[key_index] = true; + PutRandom(blob_db_, key, &rnd, &data); + } + VerifyDB(blob_db_, data); + delete blob_db_; + blob_db_ = nullptr; + + // Verify plain db return error for keys written by blob db. + ASSERT_OK(DB::Open(options, dbname_, &db)); + std::string value; + for (size_t i = 0; i < kNumKey; i++) { + std::string key = "key" + ToString(i); + Status s = db->Get(ReadOptions(), key, &value); + if (data.count(key) == 0) { + ASSERT_TRUE(s.IsNotFound()); + } else if (is_blob[i]) { + ASSERT_TRUE(s.IsNotSupported()); + } else { + ASSERT_OK(s); + ASSERT_EQ(data[key], value); + } + } + delete db; +} + } // namespace blob_db } // namespace rocksdb