From 5421c9728b0ccc31437a954001c60c5ad5fce208 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Tue, 10 Nov 2015 12:58:39 -0800 Subject: [PATCH 1/9] Make use of portable `uint64_t` type to make possible file access in 64-bit. Currently, a signed off_t type is being used for the following interfaces for both offset and the length in bytes: * `Allocate` * `RangeSync` On Linux `off_t` is automatically either 32 or 64-bit depending on the platform. On Windows it is always a 32-bit signed long which limits file access and in particular space pre-allocation to effectively 2 Gb. Proposal is to replace off_t with uint64_t as a portable type always access files with 64-bit interfaces. May need to modify posix code but lack resources to test it. --- include/rocksdb/env.h | 12 ++++++------ port/win/env_win.cc | 10 ++-------- util/env_test.cc | 4 ++-- util/file_reader_writer.cc | 2 +- util/file_reader_writer.h | 2 +- util/file_reader_writer_test.cc | 4 ++-- util/io_posix.cc | 21 +++++++++++++++------ util/io_posix.h | 6 +++--- 8 files changed, 32 insertions(+), 29 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index bbc2de579c..eb811ad02d 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -570,7 +570,7 @@ class WritableFile { // This asks the OS to initiate flushing the cached data to disk, // without waiting for completion. // Default implementation does nothing. - virtual Status RangeSync(off_t offset, off_t nbytes) { return Status::OK(); } + virtual Status RangeSync(uint64_t offset, uint64_t nbytes) { return Status::OK(); } // PrepareWrite performs any necessary preparation for a write // before the write actually occurs. This allows for pre-allocation @@ -590,8 +590,8 @@ class WritableFile { if (new_last_preallocated_block > last_preallocated_block_) { size_t num_spanned_blocks = new_last_preallocated_block - last_preallocated_block_; - Allocate(static_cast(block_size * last_preallocated_block_), - static_cast(block_size * num_spanned_blocks)); + Allocate(block_size * last_preallocated_block_, + block_size * num_spanned_blocks); last_preallocated_block_ = new_last_preallocated_block; } } @@ -600,7 +600,7 @@ class WritableFile { /* * Pre-allocate space for a file. */ - virtual Status Allocate(off_t offset, off_t len) { + virtual Status Allocate(uint64_t offset, uint64_t len) { return Status::OK(); } @@ -920,10 +920,10 @@ class WritableFileWrapper : public WritableFile { } protected: - Status Allocate(off_t offset, off_t len) override { + Status Allocate(uint64_t offset, uint64_t len) override { return target_->Allocate(offset, len); } - Status RangeSync(off_t offset, off_t nbytes) override { + Status RangeSync(uint64_t offset, uint64_t nbytes) override { return target_->RangeSync(offset, nbytes); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 95796554f0..9f81d9db22 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -61,12 +61,6 @@ ThreadStatusUpdater* CreateThreadStatusUpdater() { return new ThreadStatusUpdater(); } -// A wrapper for fadvise, if the platform doesn't support fadvise, -// it will simply return Status::NotSupport. -int Fadvise(int fd, off_t offset, size_t len, int advice) { - return 0; // simply do nothing. -} - inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { return Status::IOError(context, GetWindowsErrSz(err)); } @@ -605,7 +599,7 @@ class WinMmapFile : public WritableFile { return Status::OK(); } - virtual Status Allocate(off_t offset, off_t len) override { + virtual Status Allocate(uint64_t offset, uint64_t len) override { return Status::OK(); } }; @@ -1053,7 +1047,7 @@ class WinWritableFile : public WritableFile { return filesize_; } - virtual Status Allocate(off_t offset, off_t len) override { + virtual Status Allocate(uint64_t offset, uint64_t len) override { Status status; TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); diff --git a/util/env_test.cc b/util/env_test.cc index 7f5e4b93b1..e5fa37099c 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -971,11 +971,11 @@ TEST_F(EnvPosixTest, WritableFileWrapper) { } protected: - Status Allocate(off_t offset, off_t len) override { + Status Allocate(uint64_t offset, uint64_t len) override { inc(11); return Status::OK(); } - Status RangeSync(off_t offset, off_t nbytes) override { + Status RangeSync(uint64_t offset, uint64_t nbytes) override { inc(12); return Status::OK(); } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index f5c1788966..6d548c449d 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -248,7 +248,7 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) { return s; } -Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) { +Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { IOSTATS_TIMER_GUARD(range_sync_nanos); TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); return writable_file_->RangeSync(offset, nbytes); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 720979099f..c10cde2abe 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -162,7 +162,7 @@ class WritableFileWriter { Status WriteUnbuffered(); // Normal write Status WriteBuffered(const char* data, size_t size); - Status RangeSync(off_t offset, off_t nbytes); + Status RangeSync(uint64_t offset, uint64_t nbytes); size_t RequestToken(size_t bytes, bool align); Status SyncInternal(bool use_fsync); }; diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 370f523926..69b8cfea88 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -47,8 +47,8 @@ TEST_F(WritableFileWriterTest, RangeSync) { } protected: - Status Allocate(off_t offset, off_t len) override { return Status::OK(); } - Status RangeSync(off_t offset, off_t nbytes) override { + Status Allocate(uint64_t offset, uint64_t len) override { return Status::OK(); } + Status RangeSync(uint64_t offset, uint64_t nbytes) override { EXPECT_EQ(offset % 4096, 0u); EXPECT_EQ(nbytes % 4096, 0u); diff --git a/util/io_posix.cc b/util/io_posix.cc index 0854ab013f..dd41e2a035 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -478,12 +478,15 @@ Status PosixMmapFile::InvalidateCache(size_t offset, size_t length) { } #ifdef ROCKSDB_FALLOCATE_PRESENT -Status PosixMmapFile::Allocate(off_t offset, off_t len) { +Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) { + assert(offset <= std::numeric_limits::max()); + assert(len <= std::numeric_limits::max()); TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds); int alloc_status = 0; if (allow_fallocate_) { alloc_status = fallocate( - fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); + fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast(offset), static_cast(len)); } if (alloc_status == 0) { return Status::OK(); @@ -606,13 +609,16 @@ Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) { } #ifdef ROCKSDB_FALLOCATE_PRESENT -Status PosixWritableFile::Allocate(off_t offset, off_t len) { +Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) { + assert(offset <= std::numeric_limits::max()); + assert(len <= std::numeric_limits::max()); TEST_KILL_RANDOM("PosixWritableFile::Allocate:0", rocksdb_kill_odds); IOSTATS_TIMER_GUARD(allocate_nanos); int alloc_status = 0; if (allow_fallocate_) { alloc_status = fallocate( - fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); + fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast(offset), static_cast(len)); } if (alloc_status == 0) { return Status::OK(); @@ -621,8 +627,11 @@ Status PosixWritableFile::Allocate(off_t offset, off_t len) { } } -Status PosixWritableFile::RangeSync(off_t offset, off_t nbytes) { - if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) { +Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) { + assert(offset <= std::numeric_limits::max()); + assert(nbytes <= std::numeric_limits::max()); + if (sync_file_range(fd_, static_cast(offset), + static_cast(nbytes), SYNC_FILE_RANGE_WRITE) == 0) { return Status::OK(); } else { return IOError(filename_, errno); diff --git a/util/io_posix.h b/util/io_posix.h index 0e5da39d6b..2a45d10ffe 100644 --- a/util/io_posix.h +++ b/util/io_posix.h @@ -90,8 +90,8 @@ class PosixWritableFile : public WritableFile { virtual uint64_t GetFileSize() override; virtual Status InvalidateCache(size_t offset, size_t length) override; #ifdef ROCKSDB_FALLOCATE_PRESENT - virtual Status Allocate(off_t offset, off_t len) override; - virtual Status RangeSync(off_t offset, off_t nbytes) override; + virtual Status Allocate(uint64_t offset, uint64_t len) override; + virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override; virtual size_t GetUniqueId(char* id, size_t max_size) const override; #endif }; @@ -157,7 +157,7 @@ class PosixMmapFile : public WritableFile { virtual uint64_t GetFileSize() override; virtual Status InvalidateCache(size_t offset, size_t length) override; #ifdef ROCKSDB_FALLOCATE_PRESENT - virtual Status Allocate(off_t offset, off_t len) override; + virtual Status Allocate(uint64_t offset, uint64_t len) override; #endif }; From 631863c63b7c992c19e9209f25ec1a16964dd3d5 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Fri, 6 Nov 2015 07:03:30 -0800 Subject: [PATCH 2/9] track WriteBatch contents Summary: Parallel writes will only be possible for certain combinations of flags and WriteBatch contents. Traversing the WriteBatch at write time to check these conditions would be expensive, but it is very cheap to keep track of when building WriteBatch-es. When loading WriteBatch-es during recovery, a deferred computation state is used so that the flags never need to be computed. Test Plan: 1. add asserts and EXPECT_EQ-s 2. make check Reviewers: sdong, igor Reviewed By: igor Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D50337 --- db/write_batch.cc | 151 +++++++++++++++++++++++++++++++++- db/write_batch_test.cc | 12 +++ include/rocksdb/write_batch.h | 27 +++++- 3 files changed, 184 insertions(+), 6 deletions(-) diff --git a/db/write_batch.cc b/db/write_batch.cc index 53431b92a0..925a05efdd 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -42,22 +42,91 @@ namespace rocksdb { +// anon namespace for file-local types +namespace { + +enum ContentFlags : uint32_t { + DEFERRED = 1, + HAS_PUT = 2, + HAS_DELETE = 4, + HAS_SINGLE_DELETE = 8, + HAS_MERGE = 16, +}; + +struct BatchContentClassifier : public WriteBatch::Handler { + uint32_t content_flags = 0; + + Status PutCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_PUT; + return Status::OK(); + } + + Status DeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_DELETE; + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_SINGLE_DELETE; + return Status::OK(); + } + + Status MergeCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_MERGE; + return Status::OK(); + } +}; + +} // anon namespace + // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; struct SavePoint { size_t size; // size of rep_ int count; // count of elements in rep_ - SavePoint(size_t s, int c) : size(s), count(c) {} + uint32_t content_flags; }; struct SavePoints { std::stack stack; }; -WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr) { +WriteBatch::WriteBatch(size_t reserved_bytes) + : save_points_(nullptr), content_flags_(0), rep_() { rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); - Clear(); + rep_.resize(kHeader); +} + +WriteBatch::WriteBatch(const std::string& rep) + : save_points_(nullptr), + content_flags_(ContentFlags::DEFERRED), + rep_(rep) {} + +WriteBatch::WriteBatch(const WriteBatch& src) + : save_points_(src.save_points_), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + rep_(src.rep_) {} + +WriteBatch::WriteBatch(WriteBatch&& src) + : save_points_(std::move(src.save_points_)), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + rep_(std::move(src.rep_)) {} + +WriteBatch& WriteBatch::operator=(const WriteBatch& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(src); + } + return *this; +} + +WriteBatch& WriteBatch::operator=(WriteBatch&& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(std::move(src)); + } + return *this; } WriteBatch::~WriteBatch() { @@ -81,6 +150,8 @@ void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); + content_flags_.store(0, std::memory_order_relaxed); + if (save_points_ != nullptr) { while (!save_points_->stack.empty()) { save_points_->stack.pop(); @@ -92,6 +163,38 @@ int WriteBatch::Count() const { return WriteBatchInternal::Count(this); } +uint32_t WriteBatch::ComputeContentFlags() const { + auto rv = content_flags_.load(std::memory_order_relaxed); + if ((rv & ContentFlags::DEFERRED) != 0) { + BatchContentClassifier classifier; + Iterate(&classifier); + rv = classifier.content_flags; + + // this method is conceptually const, because it is performing a lazy + // computation that doesn't affect the abstract state of the batch. + // content_flags_ is marked mutable so that we can perform the + // following assignment + content_flags_.store(rv, std::memory_order_relaxed); + } + return rv; +} + +bool WriteBatch::HasPut() const { + return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; +} + +bool WriteBatch::HasDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0; +} + +bool WriteBatch::HasSingleDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0; +} + +bool WriteBatch::HasMerge() const { + return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0; +} + Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, Slice* value, Slice* blob) { @@ -169,21 +272,29 @@ Status WriteBatch::Iterate(Handler* handler) const { switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); found++; break; case kTypeColumnFamilyDeletion: case kTypeDeletion: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); found++; break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); found++; break; case kTypeColumnFamilyMerge: case kTypeMerge: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); found++; break; @@ -233,6 +344,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); } void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, @@ -251,6 +365,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); } void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, @@ -268,6 +385,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); } void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { @@ -284,6 +404,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); } void WriteBatch::Delete(ColumnFamilyHandle* column_family, @@ -301,6 +424,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); } void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, @@ -318,6 +444,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); } void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, @@ -336,6 +465,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); } void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, @@ -355,6 +487,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); } void WriteBatch::Merge(ColumnFamilyHandle* column_family, @@ -374,7 +509,8 @@ void WriteBatch::SetSavePoint() { save_points_ = new SavePoints(); } // Record length and count of current batch of writes. - save_points_->stack.push(SavePoint(GetDataSize(), Count())); + save_points_->stack.push(SavePoint{ + GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)}); } Status WriteBatch::RollbackToSavePoint() { @@ -387,6 +523,7 @@ Status WriteBatch::RollbackToSavePoint() { save_points_->stack.pop(); assert(savepoint.size <= rep_.size()); + assert(savepoint.count <= Count()); if (savepoint.size == rep_.size()) { // No changes to rollback @@ -396,6 +533,7 @@ Status WriteBatch::RollbackToSavePoint() { } else { rep_.resize(savepoint.size); WriteBatchInternal::SetCount(this, savepoint.count); + content_flags_.store(savepoint.content_flags, std::memory_order_relaxed); } return Status::OK(); @@ -670,12 +808,17 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); + b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); } void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= kHeader); dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); + dst->content_flags_.store( + dst->content_flags_.load(std::memory_order_relaxed) | + src->content_flags_.load(std::memory_order_relaxed), + std::memory_order_relaxed); } } // namespace rocksdb diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 4f73c82c80..62830da48b 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -39,6 +39,10 @@ static std::string PrintContents(WriteBatch* b) { ColumnFamilyMemTablesDefault cf_mems_default(mem); Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); int count = 0; + int put_count = 0; + int delete_count = 0; + int single_delete_count = 0; + int merge_count = 0; Arena arena; ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -53,18 +57,21 @@ static std::string PrintContents(WriteBatch* b) { state.append(iter->value().ToString()); state.append(")"); count++; + put_count++; break; case kTypeDeletion: state.append("Delete("); state.append(ikey.user_key.ToString()); state.append(")"); count++; + delete_count++; break; case kTypeSingleDeletion: state.append("SingleDelete("); state.append(ikey.user_key.ToString()); state.append(")"); count++; + single_delete_count++; break; case kTypeMerge: state.append("Merge("); @@ -73,6 +80,7 @@ static std::string PrintContents(WriteBatch* b) { state.append(iter->value().ToString()); state.append(")"); count++; + merge_count++; break; default: assert(false); @@ -81,6 +89,10 @@ static std::string PrintContents(WriteBatch* b) { state.append("@"); state.append(NumberToString(ikey.sequence)); } + EXPECT_EQ(b->HasPut(), put_count > 0); + EXPECT_EQ(b->HasDelete(), delete_count > 0); + EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0); + EXPECT_EQ(b->HasMerge(), merge_count > 0); if (!s.ok()) { state.append(s.ToString()); } else if (count != WriteBatchInternal::Count(b)) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 584e7347a4..f4a7ac06ea 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -25,6 +25,7 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ #define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ +#include #include #include #include @@ -201,17 +202,39 @@ class WriteBatch : public WriteBatchBase { // Returns the number of updates in the batch int Count() const; + // Returns true if PutCF will be called during Iterate + bool HasPut() const; + + // Returns true if DeleteCF will be called during Iterate + bool HasDelete() const; + + // Returns true if SingleDeleteCF will be called during Iterate + bool HasSingleDelete() const; + + // Returns trie if MergeCF will be called during Iterate + bool HasMerge() const; + using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; } // Constructor with a serialized string object - explicit WriteBatch(const std::string& rep) - : save_points_(nullptr), rep_(rep) {} + explicit WriteBatch(const std::string& rep); + + WriteBatch(const WriteBatch& src); + WriteBatch(WriteBatch&& src); + WriteBatch& operator=(const WriteBatch& src); + WriteBatch& operator=(WriteBatch&& src); private: friend class WriteBatchInternal; SavePoints* save_points_; + // For HasXYZ. Mutable to allow lazy computation of results + mutable std::atomic content_flags_; + + // Performs deferred computation of content_flags if necessary + uint32_t ComputeContentFlags() const; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ From 5270b33bd38762376268395a3e243086953518ae Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Tue, 10 Nov 2015 17:03:42 -0800 Subject: [PATCH 3/9] Make use of portable `uint64_t` type to make possible file access in 64-bit. Currently, a signed off_t type is being used for the following interfaces for both offset and the length in bytes: * `Allocate` * `RangeSync` On Linux `off_t` is automatically either 32 or 64-bit depending on the platform. On Windows it is always a 32-bit signed long which limits file access and in particular space pre-allocation to effectively 2 Gb. Proposal is to replace off_t with uint64_t as a portable type always access files with 64-bit interfaces. May need to modify posix code but lack resources to test it. --- include/rocksdb/env.h | 12 ++++++------ port/win/env_win.cc | 10 ++-------- util/env_test.cc | 4 ++-- util/file_reader_writer.cc | 2 +- util/file_reader_writer.h | 2 +- util/file_reader_writer_test.cc | 4 ++-- util/io_posix.cc | 21 +++++++++++++++------ util/io_posix.h | 6 +++--- 8 files changed, 32 insertions(+), 29 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index bbc2de579c..eb811ad02d 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -570,7 +570,7 @@ class WritableFile { // This asks the OS to initiate flushing the cached data to disk, // without waiting for completion. // Default implementation does nothing. - virtual Status RangeSync(off_t offset, off_t nbytes) { return Status::OK(); } + virtual Status RangeSync(uint64_t offset, uint64_t nbytes) { return Status::OK(); } // PrepareWrite performs any necessary preparation for a write // before the write actually occurs. This allows for pre-allocation @@ -590,8 +590,8 @@ class WritableFile { if (new_last_preallocated_block > last_preallocated_block_) { size_t num_spanned_blocks = new_last_preallocated_block - last_preallocated_block_; - Allocate(static_cast(block_size * last_preallocated_block_), - static_cast(block_size * num_spanned_blocks)); + Allocate(block_size * last_preallocated_block_, + block_size * num_spanned_blocks); last_preallocated_block_ = new_last_preallocated_block; } } @@ -600,7 +600,7 @@ class WritableFile { /* * Pre-allocate space for a file. */ - virtual Status Allocate(off_t offset, off_t len) { + virtual Status Allocate(uint64_t offset, uint64_t len) { return Status::OK(); } @@ -920,10 +920,10 @@ class WritableFileWrapper : public WritableFile { } protected: - Status Allocate(off_t offset, off_t len) override { + Status Allocate(uint64_t offset, uint64_t len) override { return target_->Allocate(offset, len); } - Status RangeSync(off_t offset, off_t nbytes) override { + Status RangeSync(uint64_t offset, uint64_t nbytes) override { return target_->RangeSync(offset, nbytes); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 95796554f0..9f81d9db22 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -61,12 +61,6 @@ ThreadStatusUpdater* CreateThreadStatusUpdater() { return new ThreadStatusUpdater(); } -// A wrapper for fadvise, if the platform doesn't support fadvise, -// it will simply return Status::NotSupport. -int Fadvise(int fd, off_t offset, size_t len, int advice) { - return 0; // simply do nothing. -} - inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { return Status::IOError(context, GetWindowsErrSz(err)); } @@ -605,7 +599,7 @@ class WinMmapFile : public WritableFile { return Status::OK(); } - virtual Status Allocate(off_t offset, off_t len) override { + virtual Status Allocate(uint64_t offset, uint64_t len) override { return Status::OK(); } }; @@ -1053,7 +1047,7 @@ class WinWritableFile : public WritableFile { return filesize_; } - virtual Status Allocate(off_t offset, off_t len) override { + virtual Status Allocate(uint64_t offset, uint64_t len) override { Status status; TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); diff --git a/util/env_test.cc b/util/env_test.cc index 7f5e4b93b1..e5fa37099c 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -971,11 +971,11 @@ TEST_F(EnvPosixTest, WritableFileWrapper) { } protected: - Status Allocate(off_t offset, off_t len) override { + Status Allocate(uint64_t offset, uint64_t len) override { inc(11); return Status::OK(); } - Status RangeSync(off_t offset, off_t nbytes) override { + Status RangeSync(uint64_t offset, uint64_t nbytes) override { inc(12); return Status::OK(); } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index f5c1788966..6d548c449d 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -248,7 +248,7 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) { return s; } -Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) { +Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { IOSTATS_TIMER_GUARD(range_sync_nanos); TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); return writable_file_->RangeSync(offset, nbytes); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 720979099f..c10cde2abe 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -162,7 +162,7 @@ class WritableFileWriter { Status WriteUnbuffered(); // Normal write Status WriteBuffered(const char* data, size_t size); - Status RangeSync(off_t offset, off_t nbytes); + Status RangeSync(uint64_t offset, uint64_t nbytes); size_t RequestToken(size_t bytes, bool align); Status SyncInternal(bool use_fsync); }; diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 370f523926..69b8cfea88 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -47,8 +47,8 @@ TEST_F(WritableFileWriterTest, RangeSync) { } protected: - Status Allocate(off_t offset, off_t len) override { return Status::OK(); } - Status RangeSync(off_t offset, off_t nbytes) override { + Status Allocate(uint64_t offset, uint64_t len) override { return Status::OK(); } + Status RangeSync(uint64_t offset, uint64_t nbytes) override { EXPECT_EQ(offset % 4096, 0u); EXPECT_EQ(nbytes % 4096, 0u); diff --git a/util/io_posix.cc b/util/io_posix.cc index 0854ab013f..dd41e2a035 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -478,12 +478,15 @@ Status PosixMmapFile::InvalidateCache(size_t offset, size_t length) { } #ifdef ROCKSDB_FALLOCATE_PRESENT -Status PosixMmapFile::Allocate(off_t offset, off_t len) { +Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) { + assert(offset <= std::numeric_limits::max()); + assert(len <= std::numeric_limits::max()); TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds); int alloc_status = 0; if (allow_fallocate_) { alloc_status = fallocate( - fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); + fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast(offset), static_cast(len)); } if (alloc_status == 0) { return Status::OK(); @@ -606,13 +609,16 @@ Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) { } #ifdef ROCKSDB_FALLOCATE_PRESENT -Status PosixWritableFile::Allocate(off_t offset, off_t len) { +Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) { + assert(offset <= std::numeric_limits::max()); + assert(len <= std::numeric_limits::max()); TEST_KILL_RANDOM("PosixWritableFile::Allocate:0", rocksdb_kill_odds); IOSTATS_TIMER_GUARD(allocate_nanos); int alloc_status = 0; if (allow_fallocate_) { alloc_status = fallocate( - fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, offset, len); + fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast(offset), static_cast(len)); } if (alloc_status == 0) { return Status::OK(); @@ -621,8 +627,11 @@ Status PosixWritableFile::Allocate(off_t offset, off_t len) { } } -Status PosixWritableFile::RangeSync(off_t offset, off_t nbytes) { - if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) { +Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) { + assert(offset <= std::numeric_limits::max()); + assert(nbytes <= std::numeric_limits::max()); + if (sync_file_range(fd_, static_cast(offset), + static_cast(nbytes), SYNC_FILE_RANGE_WRITE) == 0) { return Status::OK(); } else { return IOError(filename_, errno); diff --git a/util/io_posix.h b/util/io_posix.h index 0e5da39d6b..2a45d10ffe 100644 --- a/util/io_posix.h +++ b/util/io_posix.h @@ -90,8 +90,8 @@ class PosixWritableFile : public WritableFile { virtual uint64_t GetFileSize() override; virtual Status InvalidateCache(size_t offset, size_t length) override; #ifdef ROCKSDB_FALLOCATE_PRESENT - virtual Status Allocate(off_t offset, off_t len) override; - virtual Status RangeSync(off_t offset, off_t nbytes) override; + virtual Status Allocate(uint64_t offset, uint64_t len) override; + virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override; virtual size_t GetUniqueId(char* id, size_t max_size) const override; #endif }; @@ -157,7 +157,7 @@ class PosixMmapFile : public WritableFile { virtual uint64_t GetFileSize() override; virtual Status InvalidateCache(size_t offset, size_t length) override; #ifdef ROCKSDB_FALLOCATE_PRESENT - virtual Status Allocate(off_t offset, off_t len) override; + virtual Status Allocate(uint64_t offset, uint64_t len) override; #endif }; From e114f0abb850c31f794337bf249574ccc8cee7ca Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 10 Nov 2015 22:58:01 -0800 Subject: [PATCH 4/9] Enable RocksDB to persist Options file. Summary: This patch allows rocksdb to persist options into a file on DB::Open, SetOptions, and Create / Drop ColumnFamily. Options files are created under the same directory as the rocksdb instance. In addition, this patch also adds a fail_if_missing_options_file in DBOptions that makes any function call return non-ok status when it is not able to persist options properly. // If true, then DB::Open / CreateColumnFamily / DropColumnFamily // / SetOptions will fail if options file is not detected or properly // persisted. // // DEFAULT: false bool fail_if_missing_options_file; Options file names are formatted as OPTIONS-, and RocksDB will always keep the latest two options files. Test Plan: Add options_file_test. options_test column_family_test Reviewers: igor, IslamAbdelRahman, sdong, anthony Reviewed By: anthony Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D48285 --- CMakeLists.txt | 1 + HISTORY.md | 1 + Makefile | 4 + db/column_family_test.cc | 1 + db/db_impl.cc | 170 ++++++++++++++++++++- db/db_impl.h | 12 ++ db/db_test_util.cc | 1 + db/filename.cc | 40 ++++- db/filename.h | 16 +- db/options_file_test.cc | 116 ++++++++++++++ include/rocksdb/options.h | 7 + include/rocksdb/slice.h | 10 ++ include/rocksdb/status.h | 2 +- src.mk | 1 + util/options.cc | 14 +- util/options_helper.cc | 66 ++++++++ util/options_helper.h | 3 + utilities/backupable/backupable_db_test.cc | 8 +- 18 files changed, 455 insertions(+), 18 deletions(-) create mode 100644 db/options_file_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 808f2a64a3..ad99eba364 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -329,6 +329,7 @@ set(TESTS db/memtable_list_test.cc db/merge_test.cc db/merge_helper_test.cc + db/options_file_test.cc db/perf_context_test.cc db/plain_table_db_test.cc db/prefix_test.cc diff --git a/HISTORY.md b/HISTORY.md index f4227078ce..a54c13da42 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ * Introduce CreateLoggerFromOptions(), this function create a Logger for provided DBOptions. * Add GetAggregatedIntProperty(), which returns the sum of the GetIntProperty of all the column families. * Add MemoryUtil in rocksdb/utilities/memory.h. It currently offers a way to get the memory usage by type from a list rocksdb instances. +* RocksDB will now persist options under the same directory as the RocksDB database on successful DB::Open, CreateColumnFamily, DropColumnFamily, and SetOptions. ### Public API Changes * CompactionFilter::Context includes information of Column Family ID * The need-compaction hint given by TablePropertiesCollector::NeedCompact() will be persistent and recoverable after DB recovery. This introduces a breaking format change. If you use this experimental feature, including NewCompactOnDeletionCollectorFactory() in the new version, you may not be able to directly downgrade the DB back to version 4.0 or lower. diff --git a/Makefile b/Makefile index bbb59891fb..aae039663b 100644 --- a/Makefile +++ b/Makefile @@ -275,6 +275,7 @@ TESTS = \ memory_test \ merge_test \ merger_test \ + options_file_test \ redis_test \ reduce_levels_test \ plain_table_db_test \ @@ -892,6 +893,9 @@ merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) merger_test: table/merger_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +options_file_test: db/options_file_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + deletefile_test: db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 938c4121ac..e776ae115a 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -57,6 +57,7 @@ class ColumnFamilyTest : public testing::Test { env_ = new EnvCounter(Env::Default()); dbname_ = test::TmpDir() + "/column_family_test"; db_options_.create_if_missing = true; + db_options_.fail_if_options_file_error = true; db_options_.env = env_; DestroyDB(dbname_, Options(db_options_, column_family_options_)); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 40b7ae8173..a250dbbcbb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -84,6 +85,8 @@ #include "util/log_buffer.h" #include "util/logging.h" #include "util/mutexlock.h" +#include "util/options_helper.h" +#include "util/options_parser.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" #include "util/string_util.h" @@ -734,8 +737,12 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { // Also, SetCurrentFile creates a temp file when writing out new // manifest, which is equal to state.pending_manifest_file_number. We // should not delete that file + // + // TODO(yhchiang): carefully modify the third condition to safely + // remove the temp options files. keep = (sst_live_map.find(number) != sst_live_map.end()) || - (number == state.pending_manifest_file_number); + (number == state.pending_manifest_file_number) || + (to_delete.find(kOptionsFileNamePrefix) != std::string::npos); break; case kInfoLogFile: keep = true; @@ -747,6 +754,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { case kDBLockFile: case kIdentityFile: case kMetaDatabase: + case kOptionsFile: keep = true; break; } @@ -1922,6 +1930,19 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, new_options = *cfd->GetLatestMutableCFOptions(); } } + if (s.ok()) { + Status persist_options_status = WriteOptionsFile(); + if (!persist_options_status.ok()) { + if (db_options_.fail_if_options_file_error) { + s = Status::IOError( + "SetOptions succeeded, but unable to persist options", + persist_options_status.ToString()); + } + Warn(db_options_.info_log, + "Unable to persist options in SetOptions() -- %s", + persist_options_status.ToString().c_str()); + } + } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "SetOptions() on column family [%s], inputs:", @@ -3458,6 +3479,18 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, // this is outside the mutex if (s.ok()) { + Status persist_options_status = WriteOptionsFile(); + if (!persist_options_status.ok()) { + if (db_options_.fail_if_options_file_error) { + s = Status::IOError( + "ColumnFamily has been created, but unable to persist" + "options in CreateColumnFamily()", + persist_options_status.ToString().c_str()); + } + Warn(db_options_.info_log, + "Unable to persist options in CreateColumnFamily() -- %s", + persist_options_status.ToString().c_str()); + } NewThreadStatusCfInfo( reinterpret_cast(*handle)->cfd()); } @@ -3515,6 +3548,18 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * mutable_cf_options->max_write_buffer_number; + auto options_persist_status = WriteOptionsFile(); + if (!options_persist_status.ok()) { + if (db_options_.fail_if_options_file_error) { + s = Status::IOError( + "ColumnFamily has been dropped, but unable to persist " + "options in DropColumnFamily()", + options_persist_status.ToString().c_str()); + } + Warn(db_options_.info_log, + "Unable to persist options in DropColumnFamily() -- %s", + options_persist_status.ToString().c_str()); + } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); @@ -4931,6 +4976,19 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, impl); LogFlush(impl->db_options_.info_log); + auto persist_options_status = impl->WriteOptionsFile(); + if (!persist_options_status.ok()) { + if (db_options.fail_if_options_file_error) { + s = Status::IOError( + "DB::Open() failed --- Unable to persist Options file", + persist_options_status.ToString()); + } + Warn(impl->db_options_.info_log, + "Unable to persist options in DB::Open() -- %s", + persist_options_status.ToString().c_str()); + } + } + if (s.ok()) { *dbptr = impl; } else { for (auto* h : *handles) { @@ -4938,6 +4996,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } handles->clear(); delete impl; + *dbptr = nullptr; } return s; } @@ -5034,6 +5093,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } } } + // ignore case where no archival directory is present. env->DeleteDir(archivedir); @@ -5045,6 +5105,114 @@ Status DestroyDB(const std::string& dbname, const Options& options) { return result; } +Status DBImpl::WriteOptionsFile() { +#ifndef ROCKSDB_LITE + std::string file_name; + Status s = WriteOptionsToTempFile(&file_name); + if (!s.ok()) { + return s; + } + s = RenameTempFileToOptionsFile(file_name); + return s; +#else + return Status::OK(); +#endif // !ROCKSDB_LITE +} + +Status DBImpl::WriteOptionsToTempFile(std::string* file_name) { +#ifndef ROCKSDB_LITE + std::vector cf_names; + std::vector cf_opts; + { + InstrumentedMutexLock l(&mutex_); + // This part requires mutex to protect the column family options + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + cf_names.push_back(cfd->GetName()); + cf_opts.push_back(BuildColumnFamilyOptions( + *cfd->options(), *cfd->GetLatestMutableCFOptions())); + } + } + *file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber()); + + Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts, + *file_name, GetEnv()); + return s; +#else + return Status::OK(); +#endif // !ROCKSDB_LITE +} + +#ifndef ROCKSDB_LITE +namespace { +void DeleteOptionsFilesHelper(const std::map& filenames, + const size_t num_files_to_keep, + const std::shared_ptr& info_log, + Env* env) { + if (filenames.size() <= num_files_to_keep) { + return; + } + for (auto iter = std::next(filenames.begin(), num_files_to_keep); + iter != filenames.end(); ++iter) { + if (!env->DeleteFile(iter->second).ok()) { + Warn(info_log, "Unable to delete options file %s", iter->second.c_str()); + } + } +} +} // namespace +#endif // !ROCKSDB_LITE + +Status DBImpl::DeleteObsoleteOptionsFiles() { +#ifndef ROCKSDB_LITE + options_files_mutex_.AssertHeld(); + + std::vector filenames; + // use ordered map to store keep the filenames sorted from the newest + // to the oldest. + std::map options_filenames; + Status s; + s = GetEnv()->GetChildren(GetName(), &filenames); + if (!s.ok()) { + return s; + } + for (auto& filename : filenames) { + uint64_t file_number; + FileType type; + if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) { + options_filenames.insert( + {std::numeric_limits::max() - file_number, + GetName() + "/" + filename}); + } + } + + // Keeps the latest 2 Options file + const size_t kNumOptionsFilesKept = 2; + DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept, + db_options_.info_log, GetEnv()); + return Status::OK(); +#else + return Status::OK(); +#endif // !ROCKSDB_LITE +} + +Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) { +#ifndef ROCKSDB_LITE + InstrumentedMutexLock l(&options_files_mutex_); + Status s; + std::string options_file_name = + OptionsFileName(GetName(), versions_->NewFileNumber()); + // Retry if the file name happen to conflict with an existing one. + s = GetEnv()->RenameFile(file_name, options_file_name); + + DeleteObsoleteOptionsFiles(); + return s; +#else + return Status::OK(); +#endif // !ROCKSDB_LITE +} + #if ROCKSDB_USING_THREAD_STATUS void DBImpl::NewThreadStatusCfInfo( diff --git a/db/db_impl.h b/db/db_impl.h index 3c538dac15..b59cadfc19 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -398,6 +398,13 @@ class DBImpl : public DB { SuperVersion* super_version, Arena* arena); + // The following options file related functions should not be + // called while DB mutex is held. + Status WriteOptionsFile(); + Status WriteOptionsToTempFile(std::string* file_name); + Status RenameTempFileToOptionsFile(const std::string& file_name); + Status DeleteObsoleteOptionsFiles(); + void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, int job_id, TableProperties prop); @@ -552,8 +559,13 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; + // The mutex for options file related operations. + // NOTE: should never acquire options_file_mutex_ and mutex_ at the + // same time. + InstrumentedMutex options_files_mutex_; // State below is protected by mutex_ InstrumentedMutex mutex_; + std::atomic shutting_down_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 3563988718..6cfb1781a6 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -368,6 +368,7 @@ Options DBTestBase::CurrentOptions( } options.env = env_; options.create_if_missing = true; + options.fail_if_options_file_error = true; return options; } diff --git a/db/filename.cc b/db/filename.cc index f57b178b91..32cd8758aa 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -21,6 +21,7 @@ #include "util/file_reader_writer.h" #include "util/logging.h" #include "util/stop_watch.h" +#include "util/string_util.h" #include "util/sync_point.h" namespace rocksdb { @@ -47,8 +48,9 @@ static size_t GetInfoLogPrefix(const std::string& path, char* dest, int len) { path[i] == '_'){ dest[write_idx++] = path[i]; } else { - if (i > 0) + if (i > 0) { dest[write_idx++] = '_'; + } } i++; } @@ -146,7 +148,7 @@ std::string LockFileName(const std::string& dbname) { } std::string TempFileName(const std::string& dbname, uint64_t number) { - return MakeFileName(dbname, number, "dbtmp"); + return MakeFileName(dbname, number, kTempFileNameSuffix.c_str()); } InfoLogPrefix::InfoLogPrefix(bool has_log_dir, @@ -186,6 +188,21 @@ std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts, return log_dir + "/" + info_log_prefix.buf + ".old." + buf; } +std::string OptionsFileName(const std::string& dbname, uint64_t file_num) { + char buffer[256]; + snprintf(buffer, sizeof(buffer), "%s%06" PRIu64, + kOptionsFileNamePrefix.c_str(), file_num); + return dbname + "/" + buffer; +} + +std::string TempOptionsFileName(const std::string& dbname, uint64_t file_num) { + char buffer[256]; + snprintf(buffer, sizeof(buffer), "%s%06" PRIu64 ".%s", + kOptionsFileNamePrefix.c_str(), file_num, + kTempFileNameSuffix.c_str()); + return dbname + "/" + buffer; +} + std::string MetaDatabaseName(const std::string& dbname, uint64_t number) { char buf[100]; snprintf(buf, sizeof(buf), "/METADB-%llu", @@ -206,6 +223,8 @@ std::string IdentityFileName(const std::string& dbname) { // dbname/MANIFEST-[0-9]+ // dbname/[0-9]+.(log|sst) // dbname/METADB-[0-9]+ +// dbname/OPTIONS-[0-9]+ +// dbname/OPTIONS-[0-9]+.dbtmp // Disregards / at the beginning bool ParseFileName(const std::string& fname, uint64_t* number, @@ -268,6 +287,21 @@ bool ParseFileName(const std::string& fname, uint64_t* number, } *type = kMetaDatabase; *number = num; + } else if (rest.starts_with(kOptionsFileNamePrefix)) { + uint64_t ts_suffix; + bool is_temp_file = false; + rest.remove_prefix(kOptionsFileNamePrefix.size()); + const std::string kTempFileNameSuffixWithDot = + std::string(".") + kTempFileNameSuffix; + if (rest.ends_with(kTempFileNameSuffixWithDot)) { + rest.remove_suffix(kTempFileNameSuffixWithDot.size()); + is_temp_file = true; + } + if (!ConsumeDecimalNumber(&rest, &ts_suffix)) { + return false; + } + *number = ts_suffix; + *type = is_temp_file ? kTempFile : kOptionsFile; } else { // Avoid strtoull() to keep filename format independent of the // current locale @@ -302,7 +336,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number, } else if (suffix == Slice(kRocksDbTFileExt) || suffix == Slice(kLevelDbTFileExt)) { *type = kTableFile; - } else if (suffix == Slice("dbtmp")) { + } else if (suffix == Slice(kTempFileNameSuffix)) { *type = kTempFile; } else { return false; diff --git a/db/filename.h b/db/filename.h index 926f027de9..f7196c9f22 100644 --- a/db/filename.h +++ b/db/filename.h @@ -36,7 +36,8 @@ enum FileType { kTempFile, kInfoLogFile, // Either the current one, or an old one kMetaDatabase, - kIdentityFile + kIdentityFile, + kOptionsFile }; // Return the name of the log file with the specified number @@ -114,6 +115,19 @@ extern std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts, const std::string& db_path = "", const std::string& log_dir = ""); +static const std::string kOptionsFileNamePrefix = "OPTIONS-"; +static const std::string kTempFileNameSuffix = "dbtmp"; + +// Return a options file name given the "dbname" and file number. +// Format: OPTIONS-[number].dbtmp +extern std::string OptionsFileName(const std::string& dbname, + uint64_t file_num); + +// Return a temp options file name given the "dbname" and file number. +// Format: OPTIONS-[number] +extern std::string TempOptionsFileName(const std::string& dbname, + uint64_t file_num); + // Return the name to use for a metadatabase. The result will be prefixed with // "dbname". extern std::string MetaDatabaseName(const std::string& dbname, diff --git a/db/options_file_test.cc b/db/options_file_test.cc new file mode 100644 index 0000000000..808e492be6 --- /dev/null +++ b/db/options_file_test.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#include + +#include "db/db_impl.h" +#include "db/db_test_util.h" +#include "rocksdb/options.h" +#include "rocksdb/table.h" +#include "util/testharness.h" + +namespace rocksdb { +class OptionsFileTest : public testing::Test { + public: + OptionsFileTest() : dbname_(test::TmpDir() + "/options_file_test") {} + + std::string dbname_; +}; + +namespace { +void UpdateOptionsFiles(DB* db, + std::unordered_set* filename_history, + int* options_files_count) { + std::vector filenames; + db->GetEnv()->GetChildren(db->GetName(), &filenames); + uint64_t number; + FileType type; + *options_files_count = 0; + for (auto filename : filenames) { + if (ParseFileName(filename, &number, &type) && type == kOptionsFile) { + filename_history->insert(filename); + (*options_files_count)++; + } + } +} + +// Verify whether the current Options Files are the latest ones. +void VerifyOptionsFileName( + DB* db, const std::unordered_set& past_filenames) { + std::vector filenames; + std::unordered_set current_filenames; + db->GetEnv()->GetChildren(db->GetName(), &filenames); + uint64_t number; + FileType type; + for (auto filename : filenames) { + if (ParseFileName(filename, &number, &type) && type == kOptionsFile) { + current_filenames.insert(filename); + } + } + for (auto past_filename : past_filenames) { + if (current_filenames.find(past_filename) != current_filenames.end()) { + continue; + } + for (auto filename : current_filenames) { + ASSERT_GT(filename, past_filename); + } + } +} +} // namespace + +TEST_F(OptionsFileTest, NumberOfOptionsFiles) { + const int kReopenCount = 20; + Options opt; + opt.create_if_missing = true; + DestroyDB(dbname_, opt); + std::unordered_set filename_history; + DB* db; + for (int i = 0; i < kReopenCount; ++i) { + ASSERT_OK(DB::Open(opt, dbname_, &db)); + int num_options_files = 0; + UpdateOptionsFiles(db, &filename_history, &num_options_files); + ASSERT_GT(num_options_files, 0); + ASSERT_LE(num_options_files, 2); + // Make sure we always keep the latest option files. + VerifyOptionsFileName(db, filename_history); + delete db; + } +} + +TEST_F(OptionsFileTest, OptionsFileName) { + const uint64_t kOptionsFileNum = 12345; + uint64_t number; + FileType type; + + auto options_file_name = OptionsFileName("", kOptionsFileNum); + ASSERT_TRUE(ParseFileName(options_file_name, &number, &type, nullptr)); + ASSERT_EQ(type, kOptionsFile); + ASSERT_EQ(number, kOptionsFileNum); + + const uint64_t kTempOptionsFileNum = 54352; + auto temp_options_file_name = TempOptionsFileName("", kTempOptionsFileNum); + ASSERT_TRUE(ParseFileName(temp_options_file_name, &number, &type, nullptr)); + ASSERT_NE(temp_options_file_name.find(kTempFileNameSuffix), + std::string::npos); + ASSERT_EQ(type, kTempFile); + ASSERT_EQ(number, kTempOptionsFileNum); +} +} // namespace rocksdb + +int main(int argc, char** argv) { +#if !(defined NDEBUG) || !defined(OS_WIN) + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +#else + return 0; +#endif // !(defined NDEBUG) || !defined(OS_WIN) +} +#else +int main(int argc, char** argv) { + printf("Skipped as Options file is not supported in RocksDBLite.\n"); + return 0; +} +#endif // !ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 07678b667f..f834a94ace 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1172,6 +1172,13 @@ struct DBOptions { // currently. const WalFilter* wal_filter; #endif // ROCKSDB_LITE + + // If true, then DB::Open / CreateColumnFamily / DropColumnFamily + // / SetOptions will fail if options file is not detected or properly + // persisted. + // + // DEFAULT: false + bool fail_if_options_file_error; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index ae3139cfd6..3d39f3a046 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -73,6 +73,11 @@ class Slice { size_ -= n; } + void remove_suffix(size_t n) { + assert(n <= size()); + size_ -= n; + } + // Return a string that contains the copy of the referenced data. std::string ToString(bool hex = false) const; @@ -88,6 +93,11 @@ class Slice { (memcmp(data_, x.data_, x.size_) == 0)); } + bool ends_with(const Slice& x) const { + return ((size_ >= x.size_) && + (memcmp(data_ + size_ - x.size_, x.data_, x.size_) == 0)); + } + // Compare two slices and returns the first byte where they differ size_t difference_offset(const Slice& b) const; diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index e8e7970ccf..f3f652af74 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -48,7 +48,7 @@ class Status { kAborted = 10, kBusy = 11, kExpired = 12, - kTryAgain = 13 + kTryAgain = 13, }; Code code() const { return code_; } diff --git a/src.mk b/src.mk index f357cdd93d..02fed6ed75 100644 --- a/src.mk +++ b/src.mk @@ -206,6 +206,7 @@ TEST_BENCH_SOURCES = \ db/manual_compaction_test.cc \ db/memtablerep_bench.cc \ db/merge_test.cc \ + db/options_file_test.cc \ db/perf_context_test.cc \ db/plain_table_db_test.cc \ db/prefix_test.cc \ diff --git a/util/options.cc b/util/options.cc index 126e505cfb..3a90cd8609 100644 --- a/util/options.cc +++ b/util/options.cc @@ -259,12 +259,11 @@ DBOptions::DBOptions() enable_thread_tracking(false), delayed_write_rate(1024U * 1024U), skip_stats_update_on_db_open(false), - wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) + wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords), #ifndef ROCKSDB_LITE - , - wal_filter(nullptr) + wal_filter(nullptr), #endif // ROCKSDB_LITE -{ + fail_if_options_file_error(false) { } DBOptions::DBOptions(const Options& options) @@ -323,12 +322,11 @@ DBOptions::DBOptions(const Options& options) delayed_write_rate(options.delayed_write_rate), skip_stats_update_on_db_open(options.skip_stats_update_on_db_open), wal_recovery_mode(options.wal_recovery_mode), - row_cache(options.row_cache) + row_cache(options.row_cache), #ifndef ROCKSDB_LITE - , - wal_filter(options.wal_filter) + wal_filter(options.wal_filter), #endif // ROCKSDB_LITE -{ + fail_if_options_file_error(options.fail_if_options_file_error) { } static const char* const access_hints[] = { diff --git a/util/options_helper.cc b/util/options_helper.cc index fa8dfb49fb..3ff13ae31a 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -1315,5 +1315,71 @@ Status GetTableFactoryFromMap( return Status::OK(); } +ColumnFamilyOptions BuildColumnFamilyOptions( + const Options& options, const MutableCFOptions& mutable_cf_options) { + ColumnFamilyOptions cf_opts(options); + + // Memtable related options + cf_opts.write_buffer_size = mutable_cf_options.write_buffer_size; + cf_opts.max_write_buffer_number = mutable_cf_options.max_write_buffer_number; + cf_opts.arena_block_size = mutable_cf_options.arena_block_size; + cf_opts.memtable_prefix_bloom_bits = + mutable_cf_options.memtable_prefix_bloom_bits; + cf_opts.memtable_prefix_bloom_probes = + mutable_cf_options.memtable_prefix_bloom_probes; + cf_opts.memtable_prefix_bloom_huge_page_tlb_size = + mutable_cf_options.memtable_prefix_bloom_huge_page_tlb_size; + cf_opts.max_successive_merges = mutable_cf_options.max_successive_merges; + cf_opts.filter_deletes = mutable_cf_options.filter_deletes; + cf_opts.inplace_update_num_locks = + mutable_cf_options.inplace_update_num_locks; + + // Compaction related options + cf_opts.disable_auto_compactions = + mutable_cf_options.disable_auto_compactions; + cf_opts.soft_rate_limit = mutable_cf_options.soft_rate_limit; + cf_opts.level0_file_num_compaction_trigger = + mutable_cf_options.level0_file_num_compaction_trigger; + cf_opts.level0_slowdown_writes_trigger = + mutable_cf_options.level0_slowdown_writes_trigger; + cf_opts.level0_stop_writes_trigger = + mutable_cf_options.level0_stop_writes_trigger; + cf_opts.max_grandparent_overlap_factor = + mutable_cf_options.max_grandparent_overlap_factor; + cf_opts.expanded_compaction_factor = + mutable_cf_options.expanded_compaction_factor; + cf_opts.source_compaction_factor = + mutable_cf_options.source_compaction_factor; + cf_opts.target_file_size_base = mutable_cf_options.target_file_size_base; + cf_opts.target_file_size_multiplier = + mutable_cf_options.target_file_size_multiplier; + cf_opts.max_bytes_for_level_base = + mutable_cf_options.max_bytes_for_level_base; + cf_opts.max_bytes_for_level_multiplier = + mutable_cf_options.max_bytes_for_level_multiplier; + + cf_opts.max_bytes_for_level_multiplier_additional.clear(); + for (auto value : + mutable_cf_options.max_bytes_for_level_multiplier_additional) { + cf_opts.max_bytes_for_level_multiplier_additional.emplace_back(value); + } + + cf_opts.verify_checksums_in_compaction = + mutable_cf_options.verify_checksums_in_compaction; + + // Misc options + cf_opts.max_sequential_skip_in_iterations = + mutable_cf_options.max_sequential_skip_in_iterations; + cf_opts.paranoid_file_checks = mutable_cf_options.paranoid_file_checks; + cf_opts.compaction_measure_io_stats = + mutable_cf_options.compaction_measure_io_stats; + + cf_opts.table_factory = options.table_factory; + // TODO(yhchiang): find some way to handle the following derived options + // * max_file_size + + return cf_opts; +} + #endif // !ROCKSDB_LITE } // namespace rocksdb diff --git a/util/options_helper.h b/util/options_helper.h index 4dead5507c..c45f51d2fa 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -64,6 +64,9 @@ Status GetTableFactoryFromMap( Status GetStringFromTableFactory(std::string* opts_str, const TableFactory* tf, const std::string& delimiter = "; "); +ColumnFamilyOptions BuildColumnFamilyOptions( + const Options& options, const MutableCFOptions& mutable_cf_options); + enum class OptionType { kBoolean, kInt, diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index cf5b77ac60..13917bd4a1 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -1198,18 +1198,18 @@ TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) { OpenDBAndBackupEngine(true); env_->CreateDirIfMissing(backupdir_ + "/shared"); - std::string file_five = backupdir_ + "/shared/000005.sst"; + std::string file_five = backupdir_ + "/shared/000007.sst"; std::string file_five_contents = "I'm not really a sst file"; - // this depends on the fact that 00005.sst is the first file created by the DB + // this depends on the fact that 00007.sst is the first file created by the DB ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents)); FillDB(db_.get(), 0, 100); - // backup overwrites file 000005.sst + // backup overwrites file 000007.sst ASSERT_TRUE(backup_engine_->CreateNewBackup(db_.get(), true).ok()); std::string new_file_five_contents; ASSERT_OK(ReadFileToString(env_, file_five, &new_file_five_contents)); - // file 000005.sst was overwritten + // file 000007.sst was overwritten ASSERT_TRUE(new_file_five_contents != file_five_contents); CloseDBAndBackupEngine(); From e78389b554d0d8faf3bf485f7467b84dd15b25f2 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 10 Nov 2015 23:22:30 -0800 Subject: [PATCH 5/9] Fixed build failure of RocksDBLite test on options_file_test.cc Summary: Fixed build failure of RocksDBLite test Test Plan: options_file_test Reviewers: igor, sdong, anthony, IslamAbdelRahman Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D50595 --- db/options_file_test.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/db/options_file_test.cc b/db/options_file_test.cc index 808e492be6..86a98899a7 100644 --- a/db/options_file_test.cc +++ b/db/options_file_test.cc @@ -109,6 +109,9 @@ int main(int argc, char** argv) { #endif // !(defined NDEBUG) || !defined(OS_WIN) } #else + +#include + int main(int argc, char** argv) { printf("Skipped as Options file is not supported in RocksDBLite.\n"); return 0; From e11f676e344eb66fae0df2bf9a301b70a222e9dd Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 12 Nov 2015 06:52:43 -0800 Subject: [PATCH 6/9] Add OptionsUtil::LoadOptionsFromFile() API Summary: This patch adds OptionsUtil::LoadOptionsFromFile() and OptionsUtil::LoadLatestOptionsFromDB(), which allow developers to construct DBOptions and ColumnFamilyOptions from a RocksDB options file. Note that most pointer-typed options such as merge_operator will not be constructed. With this API, developers no longer need to remember all the options in order to reopen an existing rocksdb instance like the following: DBOptions db_options; std::vector cf_names; std::vector cf_opts; // Load primitive-typed options from an existing DB OptionsUtil::LoadLatestOptionsFromDB( dbname, &db_options, &cf_names, &cf_opts); // Initialize necessary pointer-typed options cf_opts[0].merge_operator.reset(new MyMergeOperator()); ... // Construct the vector of ColumnFamilyDescriptor std::vector cf_descs; for (size_t i = 0; i < cf_opts.size(); ++i) { cf_descs.emplace_back(cf_names[i], cf_opts[i]); } // Open the DB DB* db = nullptr; std::vector cf_handles; auto s = DB::Open(db_options, dbname, cf_descs, &handles, &db); Test Plan: Augment existing tests in column_family_test options_test db_test Reviewers: igor, IslamAbdelRahman, sdong, anthony Reviewed By: anthony Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D49095 --- CMakeLists.txt | 2 + HISTORY.md | 1 + Makefile | 4 + examples/Makefile | 3 + examples/options_file_example.cc | 113 ++++++ include/rocksdb/table.h | 16 + include/rocksdb/utilities/options_util.h | 65 ++++ src.mk | 2 + table/block_based_table_factory.h | 2 + table/cuckoo_table_factory.h | 4 +- table/plain_table_factory.h | 2 + util/options_helper.cc | 167 ++++++--- util/options_helper.h | 20 + util/options_parser.h | 6 +- util/options_test.cc | 451 +---------------------- util/testutil.cc | 208 ++++++++++- util/testutil.h | 259 +++++++++++++ utilities/options/options_util.cc | 76 ++++ utilities/options/options_util_test.cc | 102 +++++ 19 files changed, 1020 insertions(+), 483 deletions(-) create mode 100644 examples/options_file_example.cc create mode 100644 include/rocksdb/utilities/options_util.h create mode 100644 utilities/options/options_util.cc create mode 100644 utilities/options/options_util_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index ad99eba364..ed9fde0b03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -250,6 +250,7 @@ set(SOURCES utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/put.cc utilities/merge_operators/uint64add.cc + utilities/options/options_util.cc utilities/redis/redis_lists.cc utilities/spatialdb/spatial_db.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc @@ -382,6 +383,7 @@ set(TESTS utilities/geodb/geodb_test.cc utilities/memory/memory_test.cc utilities/merge_operators/string_append/stringappend_test.cc + utilities/options_util_test.cc utilities/redis/redis_lists_test.cc utilities/spatialdb/spatial_db_test.cc utilities/table_properties_collectors/compact_on_deletion_collector_test.cc diff --git a/HISTORY.md b/HISTORY.md index a54c13da42..a6a4018db1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Add GetAggregatedIntProperty(), which returns the sum of the GetIntProperty of all the column families. * Add MemoryUtil in rocksdb/utilities/memory.h. It currently offers a way to get the memory usage by type from a list rocksdb instances. * RocksDB will now persist options under the same directory as the RocksDB database on successful DB::Open, CreateColumnFamily, DropColumnFamily, and SetOptions. +* Introduce LoadLatestOptions() in rocksdb/utilities/options_util.h. This function can construct the latest DBOptions / ColumnFamilyOptions used by the specified RocksDB intance. ### Public API Changes * CompactionFilter::Context includes information of Column Family ID * The need-compaction hint given by TablePropertiesCollector::NeedCompact() will be persistent and recoverable after DB recovery. This introduces a breaking format change. If you use this experimental feature, including NewCompactOnDeletionCollectorFactory() in the new version, you may not be able to directly downgrade the DB back to version 4.0 or lower. diff --git a/Makefile b/Makefile index aae039663b..02ffd56538 100644 --- a/Makefile +++ b/Makefile @@ -303,6 +303,7 @@ TESTS = \ rate_limiter_test \ delete_scheduler_test \ options_test \ + options_util_test \ event_logger_test \ cuckoo_table_builder_test \ cuckoo_table_reader_test \ @@ -929,6 +930,9 @@ compact_files_test: db/compact_files_test.o $(LIBOBJECTS) $(TESTHARNESS) options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +options_util_test: utilities/options/options_util_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/examples/Makefile b/examples/Makefile index fe82d11cd2..5bf0577ffa 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -28,6 +28,9 @@ optimistic_transaction_example: librocksdb optimistic_transaction_example.cc transaction_example: librocksdb transaction_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) +options_file_example: librocksdb options_file_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + clean: rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example diff --git a/examples/options_file_example.cc b/examples/options_file_example.cc new file mode 100644 index 0000000000..916ff02f37 --- /dev/null +++ b/examples/options_file_example.cc @@ -0,0 +1,113 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file demonstrates how to use the utility functions defined in +// rocksdb/utilities/options_util.h to open a rocksdb database without +// remembering all the rocksdb options. +#include +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/table.h" +#include "rocksdb/utilities/options_util.h" + +using namespace rocksdb; + +std::string kDBPath = "/tmp/rocksdb_options_file_example"; + +namespace { +// A dummy compaction filter +class DummyCompactionFilter : public CompactionFilter { + public: + virtual ~DummyCompactionFilter() {} + virtual bool Filter(int level, const Slice& key, const Slice& existing_value, + std::string* new_value, bool* value_changed) const { + return false; + } + virtual const char* Name() const { return "DummyCompactionFilter"; } +}; + +} // namespace + +int main() { + DBOptions db_opt; + db_opt.create_if_missing = true; + + std::vector cf_descs; + cf_descs.push_back({kDefaultColumnFamilyName, ColumnFamilyOptions()}); + cf_descs.push_back({"new_cf", ColumnFamilyOptions()}); + + // initialize BlockBasedTableOptions + auto cache = NewLRUCache(1 * 1024 * 1024 * 1024); + BlockBasedTableOptions bbt_opts; + bbt_opts.block_size = 32 * 1024; + bbt_opts.block_cache = cache; + + // initialize column families options + std::unique_ptr compaction_filter; + compaction_filter.reset(new DummyCompactionFilter()); + cf_descs[0].options.table_factory.reset(NewBlockBasedTableFactory(bbt_opts)); + cf_descs[0].options.compaction_filter = compaction_filter.get(); + cf_descs[1].options.table_factory.reset(NewBlockBasedTableFactory(bbt_opts)); + + // destroy and open DB + DB* db; + Status s = DestroyDB(kDBPath, Options(db_opt, cf_descs[0].options)); + assert(s.ok()); + s = DB::Open(Options(db_opt, cf_descs[0].options), kDBPath, &db); + assert(s.ok()); + + // Create column family, and rocksdb will persist the options. + ColumnFamilyHandle* cf; + s = db->CreateColumnFamily(ColumnFamilyOptions(), "new_cf", &cf); + assert(s.ok()); + + // close DB + delete cf; + delete db; + + // In the following code, we will reopen the rocksdb instance using + // the options file stored in the db directory. + + // Load the options file. + DBOptions loaded_db_opt; + std::vector loaded_cf_descs; + s = LoadLatestOptions(kDBPath, Env::Default(), &loaded_db_opt, + &loaded_cf_descs); + assert(s.ok()); + assert(loaded_db_opt.create_if_missing == db_opt.create_if_missing); + + // Initialize pointer options for each column family + for (size_t i = 0; i < loaded_cf_descs.size(); ++i) { + auto* loaded_bbt_opt = reinterpret_cast( + loaded_cf_descs[0].options.table_factory->GetOptions()); + // Expect the same as BlockBasedTableOptions will be loaded form file. + assert(loaded_bbt_opt->block_size == bbt_opts.block_size); + // However, block_cache needs to be manually initialized as documented + // in rocksdb/utilities/options_util.h. + loaded_bbt_opt->block_cache = cache; + } + // In addition, as pointer options are initialized with default value, + // we need to properly initialized all the pointer options if non-defalut + // values are used before calling DB::Open(). + assert(loaded_cf_descs[0].options.compaction_filter == nullptr); + loaded_cf_descs[0].options.compaction_filter = compaction_filter.get(); + + // reopen the db using the loaded options. + std::vector handles; + s = DB::Open(loaded_db_opt, kDBPath, loaded_cf_descs, &handles, &db); + assert(s.ok()); + + // close DB + for (auto* handle : handles) { + delete handle; + } + delete db; +} diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 932c77df55..060c400e67 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -402,6 +402,22 @@ class TableFactory { // Return a string that contains printable format of table configurations. // RocksDB prints configurations at DB Open(). virtual std::string GetPrintableTableOptions() const = 0; + + // Returns the raw pointer of the table options that is used by this + // TableFactory, or nullptr if this function is not supported. + // Since the return value is a raw pointer, the TableFactory owns the + // pointer and the caller should not delete the pointer. + // + // In certan case, it is desirable to alter the underlying options when the + // TableFactory is not used by any open DB by casting the returned pointer + // to the right class. For instance, if BlockBasedTableFactory is used, + // then the pointer can be casted to BlockBasedTableOptions. + // + // Note that changing the underlying TableFactory options while the + // TableFactory is currently used by any open DB is undefined behavior. + // Developers should use DB::SetOption() instead to dynamically change + // options while the DB is open. + virtual void* GetOptions() { return nullptr; } }; #ifndef ROCKSDB_LITE diff --git a/include/rocksdb/utilities/options_util.h b/include/rocksdb/utilities/options_util.h new file mode 100644 index 0000000000..e1b4c3fda5 --- /dev/null +++ b/include/rocksdb/utilities/options_util.h @@ -0,0 +1,65 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +// This file contains utility functions for RocksDB Options. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" + +namespace rocksdb { +// Constructs the DBOptions and ColumnFamilyDescriptors by loading the +// latest RocksDB options file stored in the specified rocksdb database. +// +// Note that the all the pointer options (except table_factory, which will +// be described in more details below) will be initialized with the default +// values. Developers can further initialize them after this function call. +// Below is an example list of pointer options which will be initialized +// +// * env +// * memtable_factory +// * compaction_filter_factory +// * prefix_extractor +// * comparator +// * merge_operator +// * compaction_filter +// +// For table_factory, this function further supports deserializing +// BlockBasedTableFactory and its BlockBasedTableOptions except the +// pointer options of BlockBasedTableOptions (flush_block_policy_factory, +// block_cache, and block_cache_compressed), which will be initialized with +// default values. Developers can further specify these three options by +// casting the return value of TableFactoroy::GetOptions() to +// BlockBasedTableOptions and making necessary changes. +// +// examples/options_file_example.cc demonstrates how to use this function +// to open a RocksDB instance. +// +// @see LoadOptionsFromFile +Status LoadLatestOptions(const std::string& dbpath, Env* env, + DBOptions* db_options, + std::vector* cf_descs); + +// Similar to LoadLatestOptions, this function constructs the DBOptions +// and ColumnFamilyDescriptors based on the specified RocksDB Options file. +// +// @see LoadLatestOptions +Status LoadOptionsFromFile(const std::string& options_file_name, Env* env, + DBOptions* db_options, + std::vector* cf_descs); + +// Returns the latest options file name under the specified db path. +Status GetLatestOptionsFileName(const std::string& dbpath, Env* env, + std::string* options_file_name); + +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/src.mk b/src.mk index 02fed6ed75..b6cc81cac8 100644 --- a/src.mk +++ b/src.mk @@ -118,6 +118,7 @@ LIB_SOURCES = \ utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/string_append/stringappend.cc \ utilities/merge_operators/uint64add.cc \ + utilities/options/options_util.cc \ utilities/redis/redis_lists.cc \ utilities/spatialdb/spatial_db.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ @@ -254,6 +255,7 @@ TEST_BENCH_SOURCES = \ utilities/geodb/geodb_test.cc \ utilities/memory/memory_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \ + utilities/options_util_test.cc \ utilities/redis/redis_lists_test.cc \ utilities/spatialdb/spatial_db_test.cc \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index 522e31c223..98e622673a 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -58,6 +58,8 @@ class BlockBasedTableFactory : public TableFactory { const BlockBasedTableOptions& GetTableOptions() const; + void* GetOptions() override { return &table_options_; } + private: BlockBasedTableOptions table_options_; }; diff --git a/table/cuckoo_table_factory.h b/table/cuckoo_table_factory.h index b28342c14e..3f89ca86de 100644 --- a/table/cuckoo_table_factory.h +++ b/table/cuckoo_table_factory.h @@ -72,8 +72,10 @@ class CuckooTableFactory : public TableFactory { std::string GetPrintableTableOptions() const override; + void* GetOptions() override { return &table_options_; } + private: - const CuckooTableOptions table_options_; + CuckooTableOptions table_options_; }; } // namespace rocksdb diff --git a/table/plain_table_factory.h b/table/plain_table_factory.h index 0f5bf79372..301b07a06e 100644 --- a/table/plain_table_factory.h +++ b/table/plain_table_factory.h @@ -168,6 +168,8 @@ class PlainTableFactory : public TableFactory { return Status::OK(); } + void* GetOptions() override { return &table_options_; } + private: PlainTableOptions table_options_; }; diff --git a/util/options_helper.cc b/util/options_helper.cc index 3ff13ae31a..3c545e1152 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -548,7 +548,16 @@ bool SerializeSingleOptionHelper(const char* opt_address, case OptionType::kComparator: { // it's a const pointer of const Comparator* const auto* ptr = reinterpret_cast(opt_address); - *value = *ptr ? (*ptr)->Name() : kNullptrString; + // Since the user-specified comparator will be wrapped by + // InternalKeyComparator, we should persist the user-specified one + // instead of InternalKeyComparator. + const auto* internal_comparator = + dynamic_cast(*ptr); + if (internal_comparator != nullptr) { + *value = internal_comparator->user_comparator()->Name(); + } else { + *value = *ptr ? (*ptr)->Name() : kNullptrString; + } break; } case OptionType::kCompactionFilter: { @@ -804,10 +813,10 @@ Status StringToMap(const std::string& opts_str, return Status::OK(); } -bool ParseColumnFamilyOption(const std::string& name, - const std::string& org_value, - ColumnFamilyOptions* new_options, - bool input_strings_escaped = false) { +Status ParseColumnFamilyOption(const std::string& name, + const std::string& org_value, + ColumnFamilyOptions* new_options, + bool input_strings_escaped = false) { const std::string& value = input_strings_escaped ? UnescapeOptionString(org_value) : org_value; try { @@ -837,7 +846,8 @@ bool ParseColumnFamilyOption(const std::string& name, Status table_opt_s = GetBlockBasedTableOptionsFromString( base_table_options, value, &table_opt); if (!table_opt_s.ok()) { - return false; + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); } new_options->table_factory.reset(NewBlockBasedTableFactory(table_opt)); } else if (name == "plain_table_factory") { @@ -851,50 +861,66 @@ bool ParseColumnFamilyOption(const std::string& name, Status table_opt_s = GetPlainTableOptionsFromString( base_table_options, value, &table_opt); if (!table_opt_s.ok()) { - return false; + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); } new_options->table_factory.reset(NewPlainTableFactory(table_opt)); } else if (name == "compression_opts") { size_t start = 0; size_t end = value.find(':'); if (end == std::string::npos) { - return false; + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); } new_options->compression_opts.window_bits = ParseInt(value.substr(start, end - start)); start = end + 1; end = value.find(':', start); if (end == std::string::npos) { - return false; + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); } new_options->compression_opts.level = ParseInt(value.substr(start, end - start)); start = end + 1; if (start >= value.size()) { - return false; + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); } new_options->compression_opts.strategy = ParseInt(value.substr(start, value.size() - start)); - } else if (name == "compaction_options_universal") { - // TODO(ljin): add support - return false; } else if (name == "compaction_options_fifo") { new_options->compaction_options_fifo.max_table_files_size = ParseUint64(value); } else { auto iter = cf_options_type_info.find(name); if (iter == cf_options_type_info.end()) { - return false; + return Status::InvalidArgument( + "Unable to parse the specified CF option " + name); } const auto& opt_info = iter->second; - return ParseOptionHelper( - reinterpret_cast(new_options) + opt_info.offset, opt_info.type, - value); + if (ParseOptionHelper( + reinterpret_cast(new_options) + opt_info.offset, + opt_info.type, value)) { + return Status::OK(); + } + switch (opt_info.verification) { + case OptionVerificationType::kByName: + return Status::NotSupported( + "Deserializing the specified CF option " + name + + " is not supported"); + case OptionVerificationType::kDeprecated: + return Status::OK(); + default: + return Status::InvalidArgument( + "Unable to parse the specified CF option " + name); + } } } catch (const std::exception&) { - return false; + return Status::InvalidArgument( + "unable to parse the specified option " + name); } - return true; + return Status::OK(); } bool SerializeSingleDBOption(std::string* opt_string, @@ -1037,8 +1063,10 @@ Status GetStringFromTableFactory(std::string* opts_str, const TableFactory* tf, return Status::OK(); } -bool ParseDBOption(const std::string& name, const std::string& org_value, - DBOptions* new_options, bool input_strings_escaped = false) { +Status ParseDBOption(const std::string& name, + const std::string& org_value, + DBOptions* new_options, + bool input_strings_escaped = false) { const std::string& value = input_strings_escaped ? UnescapeOptionString(org_value) : org_value; try { @@ -1048,20 +1076,30 @@ bool ParseDBOption(const std::string& name, const std::string& org_value, } else { auto iter = db_options_type_info.find(name); if (iter == db_options_type_info.end()) { - return false; + return Status::InvalidArgument("Unrecognized option DBOptions:", name); } const auto& opt_info = iter->second; - if (opt_info.verification != OptionVerificationType::kByName && - opt_info.verification != OptionVerificationType::kDeprecated) { - return ParseOptionHelper( - reinterpret_cast(new_options) + opt_info.offset, - opt_info.type, value); + if (ParseOptionHelper( + reinterpret_cast(new_options) + opt_info.offset, + opt_info.type, value)) { + return Status::OK(); + } + switch (opt_info.verification) { + case OptionVerificationType::kByName: + return Status::NotSupported( + "Deserializing the specified DB option " + name + + " is not supported"); + case OptionVerificationType::kDeprecated: + return Status::OK(); + default: + return Status::InvalidArgument( + "Unable to parse the specified DB option " + name); } } } catch (const std::exception&) { - return false; + return Status::InvalidArgument("Unable to parse DBOptions:", name); } - return true; + return Status::OK(); } std::string ParseBlockBasedTableOption(const std::string& name, @@ -1209,16 +1247,36 @@ Status GetColumnFamilyOptionsFromMap( const ColumnFamilyOptions& base_options, const std::unordered_map& opts_map, ColumnFamilyOptions* new_options, bool input_strings_escaped) { + return GetColumnFamilyOptionsFromMapInternal( + base_options, opts_map, new_options, input_strings_escaped); +} + +Status GetColumnFamilyOptionsFromMapInternal( + const ColumnFamilyOptions& base_options, + const std::unordered_map& opts_map, + ColumnFamilyOptions* new_options, bool input_strings_escaped, + std::vector* unsupported_options_names) { assert(new_options); *new_options = base_options; + if (unsupported_options_names) { + unsupported_options_names->clear(); + } for (const auto& o : opts_map) { - if (!ParseColumnFamilyOption(o.first, o.second, new_options, - input_strings_escaped)) { - auto iter = cf_options_type_info.find(o.first); - if (iter == cf_options_type_info.end() || - (iter->second.verification != OptionVerificationType::kByName && - iter->second.verification != OptionVerificationType::kDeprecated)) { - return Status::InvalidArgument("Can't parse option " + o.first); + auto s = ParseColumnFamilyOption(o.first, o.second, new_options, + input_strings_escaped); + if (!s.ok()) { + if (s.IsNotSupported()) { + // If the deserialization of the specified option is not supported + // and an output vector of unsupported_options is provided, then + // we log the name of the unsupported option and proceed. + if (unsupported_options_names != nullptr) { + unsupported_options_names->push_back(o.first); + } + // Note that we still return Status::OK in such case to maintain + // the backward compatibility in the old public API defined in + // rocksdb/convenience.h + } else { + return s; } } } @@ -1241,13 +1299,37 @@ Status GetDBOptionsFromMap( const DBOptions& base_options, const std::unordered_map& opts_map, DBOptions* new_options, bool input_strings_escaped) { + return GetDBOptionsFromMapInternal( + base_options, opts_map, new_options, input_strings_escaped); +} + +Status GetDBOptionsFromMapInternal( + const DBOptions& base_options, + const std::unordered_map& opts_map, + DBOptions* new_options, bool input_strings_escaped, + std::vector* unsupported_options_names) { assert(new_options); *new_options = base_options; + if (unsupported_options_names) { + unsupported_options_names->clear(); + } for (const auto& o : opts_map) { - if (!ParseDBOption(o.first, o.second, new_options, input_strings_escaped)) { - // Note that options with kDeprecated validation will pass ParseDBOption - // and will not hit the below statement. - return Status::InvalidArgument("Can't parse option " + o.first); + auto s = ParseDBOption(o.first, o.second, + new_options, input_strings_escaped); + if (!s.ok()) { + if (s.IsNotSupported()) { + // If the deserialization of the specified option is not supported + // and an output vector of unsupported_options is provided, then + // we log the name of the unsupported option and proceed. + if (unsupported_options_names != nullptr) { + unsupported_options_names->push_back(o.first); + } + // Note that we still return Status::OK in such case to maintain + // the backward compatibility in the old public API defined in + // rocksdb/convenience.h + } else { + return s; + } } } return Status::OK(); @@ -1275,8 +1357,9 @@ Status GetOptionsFromString(const Options& base_options, DBOptions new_db_options(base_options); ColumnFamilyOptions new_cf_options(base_options); for (const auto& o : opts_map) { - if (ParseDBOption(o.first, o.second, &new_db_options)) { - } else if (ParseColumnFamilyOption(o.first, o.second, &new_cf_options)) { + if (ParseDBOption(o.first, o.second, &new_db_options).ok()) { + } else if (ParseColumnFamilyOption( + o.first, o.second, &new_cf_options).ok()) { } else { return Status::InvalidArgument("Can't parse option " + o.first); } diff --git a/util/options_helper.h b/util/options_helper.h index c45f51d2fa..ee885dd30f 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -7,6 +7,8 @@ #include #include +#include + #include "rocksdb/options.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -118,6 +120,24 @@ struct OptionTypeInfo { bool SerializeSingleOptionHelper(const char* opt_address, const OptionType opt_type, std::string* value); +// In addition to its public version defined in rocksdb/convenience.h, +// this further takes an optional output vector "unsupported_options_names", +// which stores the name of all the unsupported options specified in "opts_map". +Status GetDBOptionsFromMapInternal( + const DBOptions& base_options, + const std::unordered_map& opts_map, + DBOptions* new_options, bool input_strings_escaped, + std::vector* unsupported_options_names = nullptr); + +// In addition to its public version defined in rocksdb/convenience.h, +// this further takes an optional output vector "unsupported_options_names", +// which stores the name of all the unsupported options specified in "opts_map". +Status GetColumnFamilyOptionsFromMapInternal( + const ColumnFamilyOptions& base_options, + const std::unordered_map& opts_map, + ColumnFamilyOptions* new_options, bool input_strings_escaped, + std::vector* unsupported_options_names = nullptr); + static std::unordered_map db_options_type_info = { /* // not yet supported diff --git a/util/options_parser.h b/util/options_parser.h index e5dfba17a0..94e69cc2a7 100644 --- a/util/options_parser.h +++ b/util/options_parser.h @@ -80,9 +80,9 @@ class RocksDBOptionsParser { const std::unordered_map* new_opt_map = nullptr, OptionsSanityCheckLevel sanity_check_level = kSanityLevelExactMatch); - static Status VerifyTableFactory(const TableFactory* base_tf, - const TableFactory* file_tf, - OptionsSanityCheckLevel sanity_check_level); + static Status VerifyTableFactory( + const TableFactory* base_tf, const TableFactory* file_tf, + OptionsSanityCheckLevel sanity_check_level = kSanityLevelExactMatch); static Status VerifyBlockBasedTableFactory( const BlockBasedTableFactory* base_tf, diff --git a/util/options_test.cc b/util/options_test.cc index 7940fde187..d1d4ecd21c 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -16,14 +16,8 @@ #include #include "rocksdb/cache.h" -#include "rocksdb/compaction_filter.h" #include "rocksdb/convenience.h" -#include "rocksdb/merge_operator.h" -#include "rocksdb/options.h" -#include "rocksdb/table.h" #include "rocksdb/utilities/leveldb_options.h" -#include "table/block_based_table_factory.h" -#include "table/plain_table_factory.h" #include "util/options_helper.h" #include "util/options_parser.h" #include "util/options_sanity_check.h" @@ -75,163 +69,6 @@ Options PrintAndGetOptions(size_t total_write_buffer_limit, return options; } -class StringEnv : public EnvWrapper { - public: - class SeqStringSource : public SequentialFile { - public: - explicit SeqStringSource(const std::string& data) - : data_(data), offset_(0) {} - ~SeqStringSource() {} - Status Read(size_t n, Slice* result, char* scratch) override { - std::string output; - if (offset_ < data_.size()) { - n = std::min(data_.size() - offset_, n); - memcpy(scratch, data_.data() + offset_, n); - offset_ += n; - *result = Slice(scratch, n); - } else { - return Status::InvalidArgument( - "Attemp to read when it already reached eof."); - } - return Status::OK(); - } - Status Skip(uint64_t n) override { - if (offset_ >= data_.size()) { - return Status::InvalidArgument( - "Attemp to read when it already reached eof."); - } - // TODO(yhchiang): Currently doesn't handle the overflow case. - offset_ += n; - return Status::OK(); - } - - private: - std::string data_; - size_t offset_; - }; - - class StringSink : public WritableFile { - public: - explicit StringSink(std::string* contents) - : WritableFile(), contents_(contents) {} - virtual Status Truncate(uint64_t size) override { - contents_->resize(size); - return Status::OK(); - } - virtual Status Close() override { return Status::OK(); } - virtual Status Flush() override { return Status::OK(); } - virtual Status Sync() override { return Status::OK(); } - virtual Status Append(const Slice& slice) override { - contents_->append(slice.data(), slice.size()); - return Status::OK(); - } - - private: - std::string* contents_; - }; - - explicit StringEnv(Env* t) : EnvWrapper(t) {} - virtual ~StringEnv() {} - - const std::string& GetContent(const std::string& f) { return files_[f]; } - - const Status WriteToNewFile(const std::string& file_name, - const std::string& content) { - unique_ptr r; - auto s = NewWritableFile(file_name, &r, EnvOptions()); - if (!s.ok()) { - return s; - } - r->Append(content); - r->Flush(); - r->Close(); - assert(files_[file_name] == content); - return Status::OK(); - } - - // The following text is boilerplate that forwards all methods to target() - Status NewSequentialFile(const std::string& f, unique_ptr* r, - const EnvOptions& options) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist", f); - } - r->reset(new SeqStringSource(iter->second)); - return Status::OK(); - } - Status NewRandomAccessFile(const std::string& f, - unique_ptr* r, - const EnvOptions& options) override { - return Status::NotSupported(); - } - Status NewWritableFile(const std::string& f, unique_ptr* r, - const EnvOptions& options) override { - auto iter = files_.find(f); - if (iter != files_.end()) { - return Status::IOError("The specified file already exists", f); - } - r->reset(new StringSink(&files_[f])); - return Status::OK(); - } - virtual Status NewDirectory(const std::string& name, - unique_ptr* result) override { - return Status::NotSupported(); - } - Status FileExists(const std::string& f) override { - if (files_.find(f) == files_.end()) { - return Status::NotFound(); - } - return Status::OK(); - } - Status GetChildren(const std::string& dir, - std::vector* r) override { - return Status::NotSupported(); - } - Status DeleteFile(const std::string& f) override { - files_.erase(f); - return Status::OK(); - } - Status CreateDir(const std::string& d) override { - return Status::NotSupported(); - } - Status CreateDirIfMissing(const std::string& d) override { - return Status::NotSupported(); - } - Status DeleteDir(const std::string& d) override { - return Status::NotSupported(); - } - Status GetFileSize(const std::string& f, uint64_t* s) override { - auto iter = files_.find(f); - if (iter == files_.end()) { - return Status::NotFound("The specified file does not exist:", f); - } - *s = iter->second.size(); - return Status::OK(); - } - - Status GetFileModificationTime(const std::string& fname, - uint64_t* file_mtime) override { - return Status::NotSupported(); - } - - Status RenameFile(const std::string& s, const std::string& t) override { - return Status::NotSupported(); - } - - Status LinkFile(const std::string& s, const std::string& t) override { - return Status::NotSupported(); - } - - Status LockFile(const std::string& f, FileLock** l) override { - return Status::NotSupported(); - } - - Status UnlockFile(FileLock* l) override { return Status::NotSupported(); } - - protected: - std::unordered_map files_; -}; - class OptionsTest : public testing::Test {}; TEST_F(OptionsTest, LooseCondition) { @@ -736,70 +573,12 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) { ASSERT_TRUE(new_options.rate_limiter.get() != nullptr); } -namespace { -void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { - // boolean options - db_opt->advise_random_on_open = rnd->Uniform(2); - db_opt->allow_mmap_reads = rnd->Uniform(2); - db_opt->allow_mmap_writes = rnd->Uniform(2); - db_opt->allow_os_buffer = rnd->Uniform(2); - db_opt->create_if_missing = rnd->Uniform(2); - db_opt->create_missing_column_families = rnd->Uniform(2); - db_opt->disableDataSync = rnd->Uniform(2); - db_opt->enable_thread_tracking = rnd->Uniform(2); - db_opt->error_if_exists = rnd->Uniform(2); - db_opt->is_fd_close_on_exec = rnd->Uniform(2); - db_opt->paranoid_checks = rnd->Uniform(2); - db_opt->skip_log_error_on_recovery = rnd->Uniform(2); - db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); - db_opt->use_adaptive_mutex = rnd->Uniform(2); - db_opt->use_fsync = rnd->Uniform(2); - db_opt->recycle_log_file_num = rnd->Uniform(2); - - // int options - db_opt->max_background_compactions = rnd->Uniform(100); - db_opt->max_background_flushes = rnd->Uniform(100); - db_opt->max_file_opening_threads = rnd->Uniform(100); - db_opt->max_open_files = rnd->Uniform(100); - db_opt->table_cache_numshardbits = rnd->Uniform(100); - - // size_t options - db_opt->db_write_buffer_size = rnd->Uniform(10000); - db_opt->keep_log_file_num = rnd->Uniform(10000); - db_opt->log_file_time_to_roll = rnd->Uniform(10000); - db_opt->manifest_preallocation_size = rnd->Uniform(10000); - db_opt->max_log_file_size = rnd->Uniform(10000); - - // std::string options - db_opt->db_log_dir = "path/to/db_log_dir"; - db_opt->wal_dir = "path/to/wal_dir"; - - // uint32_t options - db_opt->max_subcompactions = rnd->Uniform(100000); - - // uint64_t options - static const uint64_t uint_max = static_cast(UINT_MAX); - db_opt->WAL_size_limit_MB = uint_max + rnd->Uniform(100000); - db_opt->WAL_ttl_seconds = uint_max + rnd->Uniform(100000); - db_opt->bytes_per_sync = uint_max + rnd->Uniform(100000); - db_opt->delayed_write_rate = uint_max + rnd->Uniform(100000); - db_opt->delete_obsolete_files_period_micros = uint_max + rnd->Uniform(100000); - db_opt->max_manifest_file_size = uint_max + rnd->Uniform(100000); - db_opt->max_total_wal_size = uint_max + rnd->Uniform(100000); - db_opt->wal_bytes_per_sync = uint_max + rnd->Uniform(100000); - - // unsigned int options - db_opt->stats_dump_period_sec = rnd->Uniform(100000); -} - -} // namespace - TEST_F(OptionsTest, DBOptionsSerialization) { Options base_options, new_options; Random rnd(301); // Phase 1: Make big change in base_options - RandomInitDBOptions(&base_options, &rnd); + test::RandomInitDBOptions(&base_options, &rnd); // Phase 2: obtain a string from base_option std::string base_options_file_content; @@ -812,212 +591,12 @@ TEST_F(OptionsTest, DBOptionsSerialization) { ASSERT_OK(RocksDBOptionsParser::VerifyDBOptions(base_options, new_options)); } -namespace { -CompressionType RandomCompressionType(Random* rnd) { - return static_cast(rnd->Uniform(6)); -} - -void RandomCompressionTypeVector(const size_t count, - std::vector* types, - Random* rnd) { - types->clear(); - for (size_t i = 0; i < count; ++i) { - types->emplace_back(RandomCompressionType(rnd)); - } -} - -const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1) { - int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4); - switch (random_num) { - case 0: - return NewFixedPrefixTransform(rnd->Uniform(20) + 1); - case 1: - return NewCappedPrefixTransform(rnd->Uniform(20) + 1); - case 2: - return NewNoopTransform(); - default: - return nullptr; - } -} - -TableFactory* RandomTableFactory(Random* rnd, int pre_defined = -1) { - int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(3); - switch (random_num) { - case 0: - return NewPlainTableFactory(); - case 1: - return NewCuckooTableFactory(); - default: - return NewBlockBasedTableFactory(); - } -} - -std::string RandomString(Random* rnd, const size_t len) { - std::stringstream ss; - for (size_t i = 0; i < len; ++i) { - ss << static_cast(rnd->Uniform(26) + 'a'); - } - return ss.str(); -} - -class ChanglingMergeOperator : public MergeOperator { - public: - explicit ChanglingMergeOperator(const std::string& name) - : name_(name + "MergeOperator") {} - ~ChanglingMergeOperator() {} - - void SetName(const std::string& name) { name_ = name; } - - virtual bool FullMerge(const Slice& key, const Slice* existing_value, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { - return false; - } - virtual bool PartialMergeMulti(const Slice& key, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { - return false; - } - virtual const char* Name() const override { return name_.c_str(); } - - protected: - std::string name_; -}; - -MergeOperator* RandomMergeOperator(Random* rnd) { - return new ChanglingMergeOperator(RandomString(rnd, 10)); -} - -class ChanglingCompactionFilter : public CompactionFilter { - public: - explicit ChanglingCompactionFilter(const std::string& name) - : name_(name + "CompactionFilter") {} - ~ChanglingCompactionFilter() {} - - void SetName(const std::string& name) { name_ = name; } - - bool Filter(int level, const Slice& key, const Slice& existing_value, - std::string* new_value, bool* value_changed) const override { - return false; - } - - const char* Name() const override { return name_.c_str(); } - - private: - std::string name_; -}; - -CompactionFilter* RandomCompactionFilter(Random* rnd) { - return new ChanglingCompactionFilter(RandomString(rnd, 10)); -} - -class ChanglingCompactionFilterFactory : public CompactionFilterFactory { - public: - explicit ChanglingCompactionFilterFactory(const std::string& name) - : name_(name + "CompactionFilterFactory") {} - ~ChanglingCompactionFilterFactory() {} - - void SetName(const std::string& name) { name_ = name; } - - std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) override { - return std::unique_ptr(); - } - - // Returns a name that identifies this compaction filter factory. - const char* Name() const override { return name_.c_str(); } - - protected: - std::string name_; -}; - -CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd) { - return new ChanglingCompactionFilterFactory(RandomString(rnd, 10)); -} - -// Note that the caller is responsible for releasing non-null -// cf_opt->compaction_filter. -void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd) { - cf_opt->compaction_style = (CompactionStyle)(rnd->Uniform(4)); - - // boolean options - cf_opt->compaction_measure_io_stats = rnd->Uniform(2); - cf_opt->disable_auto_compactions = rnd->Uniform(2); - cf_opt->filter_deletes = rnd->Uniform(2); - cf_opt->inplace_update_support = rnd->Uniform(2); - cf_opt->level_compaction_dynamic_level_bytes = rnd->Uniform(2); - cf_opt->optimize_filters_for_hits = rnd->Uniform(2); - cf_opt->paranoid_file_checks = rnd->Uniform(2); - cf_opt->purge_redundant_kvs_while_flush = rnd->Uniform(2); - cf_opt->verify_checksums_in_compaction = rnd->Uniform(2); - - // double options - cf_opt->hard_rate_limit = static_cast(rnd->Uniform(10000)) / 13; - cf_opt->soft_rate_limit = static_cast(rnd->Uniform(10000)) / 13; - - // int options - cf_opt->expanded_compaction_factor = rnd->Uniform(100); - cf_opt->level0_file_num_compaction_trigger = rnd->Uniform(100); - cf_opt->level0_slowdown_writes_trigger = rnd->Uniform(100); - cf_opt->level0_stop_writes_trigger = rnd->Uniform(100); - cf_opt->max_bytes_for_level_multiplier = rnd->Uniform(100); - cf_opt->max_grandparent_overlap_factor = rnd->Uniform(100); - cf_opt->max_mem_compaction_level = rnd->Uniform(100); - cf_opt->max_write_buffer_number = rnd->Uniform(100); - cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100); - cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100); - cf_opt->num_levels = rnd->Uniform(100); - cf_opt->source_compaction_factor = rnd->Uniform(100); - cf_opt->target_file_size_multiplier = rnd->Uniform(100); - - // size_t options - cf_opt->arena_block_size = rnd->Uniform(10000); - cf_opt->inplace_update_num_locks = rnd->Uniform(10000); - cf_opt->max_successive_merges = rnd->Uniform(10000); - cf_opt->memtable_prefix_bloom_huge_page_tlb_size = rnd->Uniform(10000); - cf_opt->write_buffer_size = rnd->Uniform(10000); - - // uint32_t options - cf_opt->bloom_locality = rnd->Uniform(10000); - cf_opt->memtable_prefix_bloom_bits = rnd->Uniform(10000); - cf_opt->memtable_prefix_bloom_probes = rnd->Uniform(10000); - cf_opt->min_partial_merge_operands = rnd->Uniform(10000); - cf_opt->max_bytes_for_level_base = rnd->Uniform(10000); - - // uint64_t options - static const uint64_t uint_max = static_cast(UINT_MAX); - cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000); - cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000); - - // unsigned int options - cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000); - - // pointer typed options - cf_opt->prefix_extractor.reset(RandomSliceTransform(rnd)); - cf_opt->table_factory.reset(RandomTableFactory(rnd)); - cf_opt->merge_operator.reset(RandomMergeOperator(rnd)); - if (cf_opt->compaction_filter) { - delete cf_opt->compaction_filter; - } - cf_opt->compaction_filter = RandomCompactionFilter(rnd); - cf_opt->compaction_filter_factory.reset(RandomCompactionFilterFactory(rnd)); - - // custom typed options - cf_opt->compression = RandomCompressionType(rnd); - RandomCompressionTypeVector(cf_opt->num_levels, - &cf_opt->compression_per_level, rnd); -} - -} // namespace - TEST_F(OptionsTest, ColumnFamilyOptionsSerialization) { ColumnFamilyOptions base_opt, new_opt; Random rnd(302); // Phase 1: randomly assign base_opt // custom type options - RandomInitCFOptions(&base_opt, &rnd); + test::RandomInitCFOptions(&base_opt, &rnd); // Phase 2: obtain a string from base_opt std::string base_options_file_content; @@ -1036,7 +615,6 @@ TEST_F(OptionsTest, ColumnFamilyOptionsSerialization) { #endif // !ROCKSDB_LITE - Status StringToMap( const std::string& opts_str, std::unordered_map* opts_map); @@ -1238,10 +816,10 @@ TEST_F(OptionsTest, ConvertOptionsTest) { #ifndef ROCKSDB_LITE class OptionsParserTest : public testing::Test { public: - OptionsParserTest() { env_.reset(new StringEnv(Env::Default())); } + OptionsParserTest() { env_.reset(new test::StringEnv(Env::Default())); } protected: - std::unique_ptr env_; + std::unique_ptr env_; }; TEST_F(OptionsParserTest, Comment) { @@ -1485,7 +1063,7 @@ void VerifyCFPointerTypedOptions( // change the name of merge operator back-and-forth { - auto* merge_operator = dynamic_cast( + auto* merge_operator = dynamic_cast( base_cf_opt->merge_operator.get()); if (merge_operator != nullptr) { name_buffer = merge_operator->Name(); @@ -1503,7 +1081,7 @@ void VerifyCFPointerTypedOptions( // change the name of the compaction filter factory back-and-forth { auto* compaction_filter_factory = - dynamic_cast( + dynamic_cast( base_cf_opt->compaction_filter_factory.get()); if (compaction_filter_factory != nullptr) { name_buffer = compaction_filter_factory->Name(); @@ -1573,17 +1151,17 @@ TEST_F(OptionsParserTest, DumpAndParse) { "###rocksdb#1-testcf#2###"}; const int num_cf = static_cast(cf_names.size()); Random rnd(302); - RandomInitDBOptions(&base_db_opt, &rnd); + test::RandomInitDBOptions(&base_db_opt, &rnd); base_db_opt.db_log_dir += "/#odd #but #could #happen #path #/\\\\#OMG"; for (int c = 0; c < num_cf; ++c) { ColumnFamilyOptions cf_opt; Random cf_rnd(0xFB + c); - RandomInitCFOptions(&cf_opt, &cf_rnd); + test::RandomInitCFOptions(&cf_opt, &cf_rnd); if (c < 4) { - cf_opt.prefix_extractor.reset(RandomSliceTransform(&rnd, c)); + cf_opt.prefix_extractor.reset(test::RandomSliceTransform(&rnd, c)); } if (c < 3) { - cf_opt.table_factory.reset(RandomTableFactory(&rnd, c)); + cf_opt.table_factory.reset(test::RandomTableFactory(&rnd, c)); } base_cf_opts.emplace_back(cf_opt); } @@ -1717,7 +1295,7 @@ TEST_F(OptionsSanityCheckTest, SanityCheck) { { for (int tb = 2; tb >= 0; --tb) { // change the table factory - opts.table_factory.reset(RandomTableFactory(&rnd, tb)); + opts.table_factory.reset(test::RandomTableFactory(&rnd, tb)); ASSERT_NOK(SanityCheckCFOptions(opts, kSanityLevelLooselyCompatible)); ASSERT_OK(SanityCheckCFOptions(opts, kSanityLevelNone)); @@ -1731,7 +1309,7 @@ TEST_F(OptionsSanityCheckTest, SanityCheck) { { for (int test = 0; test < 5; ++test) { // change the merge operator - opts.merge_operator.reset(RandomMergeOperator(&rnd)); + opts.merge_operator.reset(test::RandomMergeOperator(&rnd)); ASSERT_NOK(SanityCheckCFOptions(opts, kSanityLevelLooselyCompatible)); ASSERT_OK(SanityCheckCFOptions(opts, kSanityLevelNone)); @@ -1745,7 +1323,7 @@ TEST_F(OptionsSanityCheckTest, SanityCheck) { { for (int test = 0; test < 5; ++test) { // change the compaction filter - opts.compaction_filter = RandomCompactionFilter(&rnd); + opts.compaction_filter = test::RandomCompactionFilter(&rnd); ASSERT_NOK(SanityCheckCFOptions(opts, kSanityLevelExactMatch)); ASSERT_OK(SanityCheckCFOptions(opts, kSanityLevelLooselyCompatible)); @@ -1761,7 +1339,8 @@ TEST_F(OptionsSanityCheckTest, SanityCheck) { { for (int test = 0; test < 5; ++test) { // change the compaction filter factory - opts.compaction_filter_factory.reset(RandomCompactionFilterFactory(&rnd)); + opts.compaction_filter_factory.reset( + test::RandomCompactionFilterFactory(&rnd)); ASSERT_NOK(SanityCheckCFOptions(opts, kSanityLevelExactMatch)); ASSERT_OK(SanityCheckCFOptions(opts, kSanityLevelLooselyCompatible)); diff --git a/util/testutil.cc b/util/testutil.cc index b995a2e531..8db8dac887 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -9,9 +9,11 @@ #include "util/testutil.h" +#include +#include + #include "port/port.h" #include "util/file_reader_writer.h" -#include "util/random.h" namespace rocksdb { namespace test { @@ -153,5 +155,209 @@ std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, return k.Encode().ToString(); } +std::string RandomName(Random* rnd, const size_t len) { + std::stringstream ss; + for (size_t i = 0; i < len; ++i) { + ss << static_cast(rnd->Uniform(26) + 'a'); + } + return ss.str(); +} + +CompressionType RandomCompressionType(Random* rnd) { + return static_cast(rnd->Uniform(6)); +} + +void RandomCompressionTypeVector(const size_t count, + std::vector* types, + Random* rnd) { + types->clear(); + for (size_t i = 0; i < count; ++i) { + types->emplace_back(RandomCompressionType(rnd)); + } +} + +const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) { + int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4); + switch (random_num) { + case 0: + return NewFixedPrefixTransform(rnd->Uniform(20) + 1); + case 1: + return NewCappedPrefixTransform(rnd->Uniform(20) + 1); + case 2: + return NewNoopTransform(); + default: + return nullptr; + } +} + +BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) { + BlockBasedTableOptions opt; + opt.cache_index_and_filter_blocks = rnd->Uniform(2); + opt.index_type = rnd->Uniform(2) ? BlockBasedTableOptions::kBinarySearch + : BlockBasedTableOptions::kHashSearch; + opt.hash_index_allow_collision = rnd->Uniform(2); + opt.checksum = static_cast(rnd->Uniform(3)); + opt.block_size = rnd->Uniform(10000000); + opt.block_size_deviation = rnd->Uniform(100); + opt.block_restart_interval = rnd->Uniform(100); + opt.whole_key_filtering = rnd->Uniform(2); + + return opt; +} + +TableFactory* RandomTableFactory(Random* rnd, int pre_defined) { +#ifndef ROCKSDB_LITE + int random_num = pre_defined >= 0 ? pre_defined : rnd->Uniform(4); + switch (random_num) { + case 0: + return NewPlainTableFactory(); + case 1: + return NewCuckooTableFactory(); + default: + return NewBlockBasedTableFactory(); + } +#else + return NewBlockBasedTableFactory(); +#endif // !ROCKSDB_LITE +} + +MergeOperator* RandomMergeOperator(Random* rnd) { + return new ChanglingMergeOperator(RandomName(rnd, 10)); +} + +CompactionFilter* RandomCompactionFilter(Random* rnd) { + return new ChanglingCompactionFilter(RandomName(rnd, 10)); +} + +CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd) { + return new ChanglingCompactionFilterFactory(RandomName(rnd, 10)); +} + +void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { + // boolean options + db_opt->advise_random_on_open = rnd->Uniform(2); + db_opt->allow_mmap_reads = rnd->Uniform(2); + db_opt->allow_mmap_writes = rnd->Uniform(2); + db_opt->allow_os_buffer = rnd->Uniform(2); + db_opt->create_if_missing = rnd->Uniform(2); + db_opt->create_missing_column_families = rnd->Uniform(2); + db_opt->disableDataSync = rnd->Uniform(2); + db_opt->enable_thread_tracking = rnd->Uniform(2); + db_opt->error_if_exists = rnd->Uniform(2); + db_opt->is_fd_close_on_exec = rnd->Uniform(2); + db_opt->paranoid_checks = rnd->Uniform(2); + db_opt->skip_log_error_on_recovery = rnd->Uniform(2); + db_opt->skip_stats_update_on_db_open = rnd->Uniform(2); + db_opt->use_adaptive_mutex = rnd->Uniform(2); + db_opt->use_fsync = rnd->Uniform(2); + db_opt->recycle_log_file_num = rnd->Uniform(2); + + // int options + db_opt->max_background_compactions = rnd->Uniform(100); + db_opt->max_background_flushes = rnd->Uniform(100); + db_opt->max_file_opening_threads = rnd->Uniform(100); + db_opt->max_open_files = rnd->Uniform(100); + db_opt->table_cache_numshardbits = rnd->Uniform(100); + + // size_t options + db_opt->db_write_buffer_size = rnd->Uniform(10000); + db_opt->keep_log_file_num = rnd->Uniform(10000); + db_opt->log_file_time_to_roll = rnd->Uniform(10000); + db_opt->manifest_preallocation_size = rnd->Uniform(10000); + db_opt->max_log_file_size = rnd->Uniform(10000); + + // std::string options + db_opt->db_log_dir = "path/to/db_log_dir"; + db_opt->wal_dir = "path/to/wal_dir"; + + // uint32_t options + db_opt->max_subcompactions = rnd->Uniform(100000); + + // uint64_t options + static const uint64_t uint_max = static_cast(UINT_MAX); + db_opt->WAL_size_limit_MB = uint_max + rnd->Uniform(100000); + db_opt->WAL_ttl_seconds = uint_max + rnd->Uniform(100000); + db_opt->bytes_per_sync = uint_max + rnd->Uniform(100000); + db_opt->delayed_write_rate = uint_max + rnd->Uniform(100000); + db_opt->delete_obsolete_files_period_micros = uint_max + rnd->Uniform(100000); + db_opt->max_manifest_file_size = uint_max + rnd->Uniform(100000); + db_opt->max_total_wal_size = uint_max + rnd->Uniform(100000); + db_opt->wal_bytes_per_sync = uint_max + rnd->Uniform(100000); + + // unsigned int options + db_opt->stats_dump_period_sec = rnd->Uniform(100000); +} + +void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd) { + cf_opt->compaction_style = (CompactionStyle)(rnd->Uniform(4)); + + // boolean options + cf_opt->compaction_measure_io_stats = rnd->Uniform(2); + cf_opt->disable_auto_compactions = rnd->Uniform(2); + cf_opt->filter_deletes = rnd->Uniform(2); + cf_opt->inplace_update_support = rnd->Uniform(2); + cf_opt->level_compaction_dynamic_level_bytes = rnd->Uniform(2); + cf_opt->optimize_filters_for_hits = rnd->Uniform(2); + cf_opt->paranoid_file_checks = rnd->Uniform(2); + cf_opt->purge_redundant_kvs_while_flush = rnd->Uniform(2); + cf_opt->verify_checksums_in_compaction = rnd->Uniform(2); + + // double options + cf_opt->hard_rate_limit = static_cast(rnd->Uniform(10000)) / 13; + cf_opt->soft_rate_limit = static_cast(rnd->Uniform(10000)) / 13; + + // int options + cf_opt->expanded_compaction_factor = rnd->Uniform(100); + cf_opt->level0_file_num_compaction_trigger = rnd->Uniform(100); + cf_opt->level0_slowdown_writes_trigger = rnd->Uniform(100); + cf_opt->level0_stop_writes_trigger = rnd->Uniform(100); + cf_opt->max_bytes_for_level_multiplier = rnd->Uniform(100); + cf_opt->max_grandparent_overlap_factor = rnd->Uniform(100); + cf_opt->max_mem_compaction_level = rnd->Uniform(100); + cf_opt->max_write_buffer_number = rnd->Uniform(100); + cf_opt->max_write_buffer_number_to_maintain = rnd->Uniform(100); + cf_opt->min_write_buffer_number_to_merge = rnd->Uniform(100); + cf_opt->num_levels = rnd->Uniform(100); + cf_opt->source_compaction_factor = rnd->Uniform(100); + cf_opt->target_file_size_multiplier = rnd->Uniform(100); + + // size_t options + cf_opt->arena_block_size = rnd->Uniform(10000); + cf_opt->inplace_update_num_locks = rnd->Uniform(10000); + cf_opt->max_successive_merges = rnd->Uniform(10000); + cf_opt->memtable_prefix_bloom_huge_page_tlb_size = rnd->Uniform(10000); + cf_opt->write_buffer_size = rnd->Uniform(10000); + + // uint32_t options + cf_opt->bloom_locality = rnd->Uniform(10000); + cf_opt->memtable_prefix_bloom_bits = rnd->Uniform(10000); + cf_opt->memtable_prefix_bloom_probes = rnd->Uniform(10000); + cf_opt->min_partial_merge_operands = rnd->Uniform(10000); + cf_opt->max_bytes_for_level_base = rnd->Uniform(10000); + + // uint64_t options + static const uint64_t uint_max = static_cast(UINT_MAX); + cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000); + cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000); + + // unsigned int options + cf_opt->rate_limit_delay_max_milliseconds = rnd->Uniform(10000); + + // pointer typed options + cf_opt->prefix_extractor.reset(RandomSliceTransform(rnd)); + cf_opt->table_factory.reset(RandomTableFactory(rnd)); + cf_opt->merge_operator.reset(RandomMergeOperator(rnd)); + if (cf_opt->compaction_filter) { + delete cf_opt->compaction_filter; + } + cf_opt->compaction_filter = RandomCompactionFilter(rnd); + cf_opt->compaction_filter_factory.reset(RandomCompactionFilterFactory(rnd)); + + // custom typed options + cf_opt->compression = RandomCompressionType(rnd); + RandomCompressionTypeVector(cf_opt->num_levels, + &cf_opt->compression_per_level, rnd); +} + } // namespace test } // namespace rocksdb diff --git a/util/testutil.h b/util/testutil.h index 0373532a87..a47d5abce0 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include @@ -16,8 +17,13 @@ #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/options.h" #include "rocksdb/slice.h" +#include "rocksdb/table.h" +#include "table/block_based_table_factory.h" #include "table/internal_iterator.h" +#include "table/plain_table_factory.h" #include "util/mutexlock.h" #include "util/random.h" @@ -378,5 +384,258 @@ inline std::string EncodeInt(uint64_t x) { return result; } +class StringEnv : public EnvWrapper { + public: + class SeqStringSource : public SequentialFile { + public: + explicit SeqStringSource(const std::string& data) + : data_(data), offset_(0) {} + ~SeqStringSource() {} + Status Read(size_t n, Slice* result, char* scratch) override { + std::string output; + if (offset_ < data_.size()) { + n = std::min(data_.size() - offset_, n); + memcpy(scratch, data_.data() + offset_, n); + offset_ += n; + *result = Slice(scratch, n); + } else { + return Status::InvalidArgument( + "Attemp to read when it already reached eof."); + } + return Status::OK(); + } + Status Skip(uint64_t n) override { + if (offset_ >= data_.size()) { + return Status::InvalidArgument( + "Attemp to read when it already reached eof."); + } + // TODO(yhchiang): Currently doesn't handle the overflow case. + offset_ += n; + return Status::OK(); + } + + private: + std::string data_; + size_t offset_; + }; + + class StringSink : public WritableFile { + public: + explicit StringSink(std::string* contents) + : WritableFile(), contents_(contents) {} + virtual Status Truncate(uint64_t size) override { + contents_->resize(size); + return Status::OK(); + } + virtual Status Close() override { return Status::OK(); } + virtual Status Flush() override { return Status::OK(); } + virtual Status Sync() override { return Status::OK(); } + virtual Status Append(const Slice& slice) override { + contents_->append(slice.data(), slice.size()); + return Status::OK(); + } + + private: + std::string* contents_; + }; + + explicit StringEnv(Env* t) : EnvWrapper(t) {} + virtual ~StringEnv() {} + + const std::string& GetContent(const std::string& f) { return files_[f]; } + + const Status WriteToNewFile(const std::string& file_name, + const std::string& content) { + unique_ptr r; + auto s = NewWritableFile(file_name, &r, EnvOptions()); + if (!s.ok()) { + return s; + } + r->Append(content); + r->Flush(); + r->Close(); + assert(files_[file_name] == content); + return Status::OK(); + } + + // The following text is boilerplate that forwards all methods to target() + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& options) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return Status::NotFound("The specified file does not exist", f); + } + r->reset(new SeqStringSource(iter->second)); + return Status::OK(); + } + Status NewRandomAccessFile(const std::string& f, + unique_ptr* r, + const EnvOptions& options) override { + return Status::NotSupported(); + } + Status NewWritableFile(const std::string& f, unique_ptr* r, + const EnvOptions& options) override { + auto iter = files_.find(f); + if (iter != files_.end()) { + return Status::IOError("The specified file already exists", f); + } + r->reset(new StringSink(&files_[f])); + return Status::OK(); + } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) override { + return Status::NotSupported(); + } + Status FileExists(const std::string& f) override { + if (files_.find(f) == files_.end()) { + return Status::NotFound(); + } + return Status::OK(); + } + Status GetChildren(const std::string& dir, + std::vector* r) override { + return Status::NotSupported(); + } + Status DeleteFile(const std::string& f) override { + files_.erase(f); + return Status::OK(); + } + Status CreateDir(const std::string& d) override { + return Status::NotSupported(); + } + Status CreateDirIfMissing(const std::string& d) override { + return Status::NotSupported(); + } + Status DeleteDir(const std::string& d) override { + return Status::NotSupported(); + } + Status GetFileSize(const std::string& f, uint64_t* s) override { + auto iter = files_.find(f); + if (iter == files_.end()) { + return Status::NotFound("The specified file does not exist:", f); + } + *s = iter->second.size(); + return Status::OK(); + } + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override { + return Status::NotSupported(); + } + + Status RenameFile(const std::string& s, const std::string& t) override { + return Status::NotSupported(); + } + + Status LinkFile(const std::string& s, const std::string& t) override { + return Status::NotSupported(); + } + + Status LockFile(const std::string& f, FileLock** l) override { + return Status::NotSupported(); + } + + Status UnlockFile(FileLock* l) override { return Status::NotSupported(); } + + protected: + std::unordered_map files_; +}; + +// Randomly initialize the given DBOptions +void RandomInitDBOptions(DBOptions* db_opt, Random* rnd); + +// Randomly initialize the given ColumnFamilyOptions +// Note that the caller is responsible for releasing non-null +// cf_opt->compaction_filter. +void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd); + +// A dummy merge operator which can change its name +class ChanglingMergeOperator : public MergeOperator { + public: + explicit ChanglingMergeOperator(const std::string& name) + : name_(name + "MergeOperator") {} + ~ChanglingMergeOperator() {} + + void SetName(const std::string& name) { name_ = name; } + + virtual bool FullMerge(const Slice& key, const Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override { + return false; + } + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override { + return false; + } + virtual const char* Name() const override { return name_.c_str(); } + + protected: + std::string name_; +}; + +// Returns a dummy merge operator with random name. +MergeOperator* RandomMergeOperator(Random* rnd); + +// A dummy compaction filter which can change its name +class ChanglingCompactionFilter : public CompactionFilter { + public: + explicit ChanglingCompactionFilter(const std::string& name) + : name_(name + "CompactionFilter") {} + ~ChanglingCompactionFilter() {} + + void SetName(const std::string& name) { name_ = name; } + + bool Filter(int level, const Slice& key, const Slice& existing_value, + std::string* new_value, bool* value_changed) const override { + return false; + } + + const char* Name() const override { return name_.c_str(); } + + private: + std::string name_; +}; + +// Returns a dummy compaction filter with a random name. +CompactionFilter* RandomCompactionFilter(Random* rnd); + +// A dummy compaction filter factory which can change its name +class ChanglingCompactionFilterFactory : public CompactionFilterFactory { + public: + explicit ChanglingCompactionFilterFactory(const std::string& name) + : name_(name + "CompactionFilterFactory") {} + ~ChanglingCompactionFilterFactory() {} + + void SetName(const std::string& name) { name_ = name; } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + return std::unique_ptr(); + } + + // Returns a name that identifies this compaction filter factory. + const char* Name() const override { return name_.c_str(); } + + protected: + std::string name_; +}; + +CompressionType RandomCompressionType(Random* rnd); + +void RandomCompressionTypeVector(const size_t count, + std::vector* types, + Random* rnd); + +CompactionFilterFactory* RandomCompactionFilterFactory(Random* rnd); + +const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1); + +TableFactory* RandomTableFactory(Random* rnd, int pre_defined = -1); + +std::string RandomName(Random* rnd, const size_t len); + } // namespace test } // namespace rocksdb diff --git a/utilities/options/options_util.cc b/utilities/options/options_util.cc new file mode 100644 index 0000000000..6a595c3d82 --- /dev/null +++ b/utilities/options/options_util.cc @@ -0,0 +1,76 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/options_util.h" + +#include "db/filename.h" +#include "rocksdb/options.h" +#include "util/options_parser.h" + +namespace rocksdb { +Status LoadOptionsFromFile(const std::string& file_name, Env* env, + DBOptions* db_options, + std::vector* cf_descs) { + RocksDBOptionsParser parser; + Status s = parser.Parse(file_name, env); + if (!s.ok()) { + return s; + } + + *db_options = *parser.db_opt(); + + const std::vector& cf_names = *parser.cf_names(); + const std::vector& cf_opts = *parser.cf_opts(); + cf_descs->clear(); + for (size_t i = 0; i < cf_opts.size(); ++i) { + cf_descs->push_back({cf_names[i], cf_opts[i]}); + } + return Status::OK(); +} + +Status GetLatestOptionsFileName(const std::string& dbpath, + Env* env, std::string* options_file_name) { + Status s; + std::string latest_file_name; + uint64_t latest_time_stamp = 0; + std::vector file_names; + s = env->GetChildren(dbpath, &file_names); + if (!s.ok()) { + return s; + } + for (auto& file_name : file_names) { + uint64_t time_stamp; + FileType type; + if (ParseFileName(file_name, &time_stamp, &type) && type == kOptionsFile) { + if (time_stamp > latest_time_stamp) { + latest_time_stamp = time_stamp; + latest_file_name = file_name; + } + } + } + if (latest_file_name.size() == 0) { + return Status::NotFound("No options files found in the DB directory."); + } + *options_file_name = latest_file_name; + return Status::OK(); +} + +Status LoadLatestOptions(const std::string& dbpath, Env* env, + DBOptions* db_options, + std::vector* cf_descs) { + std::string options_file_name; + Status s = GetLatestOptionsFileName(dbpath, env, &options_file_name); + if (!s.ok()) { + return s; + } + + return LoadOptionsFromFile(dbpath + "/" + options_file_name, env, + db_options, cf_descs); +} + +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/utilities/options/options_util_test.cc b/utilities/options/options_util_test.cc new file mode 100644 index 0000000000..0ad29fb4b8 --- /dev/null +++ b/utilities/options/options_util_test.cc @@ -0,0 +1,102 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include + +#include +#include + +#include "rocksdb/utilities/options_util.h" +#include "util/options_parser.h" +#include "util/random.h" +#include "util/testharness.h" +#include "util/testutil.h" + +#ifndef GFLAGS +bool FLAGS_enable_print = false; +#else +#include +using GFLAGS::ParseCommandLineFlags; +DEFINE_bool(enable_print, false, "Print options generated to console."); +#endif // GFLAGS + +namespace rocksdb { +class OptionsUtilTest : public testing::Test { + public: + OptionsUtilTest() { env_.reset(new test::StringEnv(Env::Default())); } + + protected: + std::unique_ptr env_; +}; + +bool IsBlockBasedTableFactory(TableFactory* tf) { + return tf->Name() == BlockBasedTableFactory().Name(); +} + +TEST_F(OptionsUtilTest, SaveAndLoad) { + const size_t kCFCount = 5; + Random rnd(0xFB); + + DBOptions db_opt; + std::vector cf_names; + std::vector cf_opts; + test::RandomInitDBOptions(&db_opt, &rnd); + for (size_t i = 0; i < kCFCount; ++i) { + cf_names.push_back(i == 0 ? kDefaultColumnFamilyName + : test::RandomName(&rnd, 10)); + cf_opts.emplace_back(); + test::RandomInitCFOptions(&cf_opts.back(), &rnd); + } + + const std::string kFileName = "OPTIONS-123456"; + PersistRocksDBOptions(db_opt, cf_names, cf_opts, kFileName, env_.get()); + + DBOptions loaded_db_opt; + std::vector loaded_cf_descs; + ASSERT_OK(LoadOptionsFromFile(kFileName, env_.get(), &loaded_db_opt, + &loaded_cf_descs)); + + ASSERT_OK(RocksDBOptionsParser::VerifyDBOptions(db_opt, loaded_db_opt)); + test::RandomInitDBOptions(&db_opt, &rnd); + ASSERT_NOK(RocksDBOptionsParser::VerifyDBOptions(db_opt, loaded_db_opt)); + + for (size_t i = 0; i < kCFCount; ++i) { + ASSERT_EQ(cf_names[i], loaded_cf_descs[i].name); + ASSERT_OK(RocksDBOptionsParser::VerifyCFOptions( + cf_opts[i], loaded_cf_descs[i].options)); + if (IsBlockBasedTableFactory(cf_opts[i].table_factory.get())) { + ASSERT_OK(RocksDBOptionsParser::VerifyTableFactory( + cf_opts[i].table_factory.get(), + loaded_cf_descs[i].options.table_factory.get())); + } + test::RandomInitCFOptions(&cf_opts[i], &rnd); + ASSERT_NOK(RocksDBOptionsParser::VerifyCFOptions( + cf_opts[i], loaded_cf_descs[i].options)); + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); +#ifdef GFLAGS + ParseCommandLineFlags(&argc, &argv, true); +#endif // GFLAGS + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + printf("Skipped in RocksDBLite as utilities are not supported.\n"); + return 0; +} +#endif // !ROCKSDB_LITE From 56245ddcf56fc30521d25dc25342eae49a716a9b Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 12 Nov 2015 07:43:28 -0800 Subject: [PATCH 7/9] Fixed DBCompactionTest.SkipStatsUpdateTest Summary: DBCompactionTest.SkipStatsUpdateTest relies on the number of files opened during the DB::Open process, but the persisting options file support altered this number and thus makes DBCompactionTest.SkipStatsUpdateTest in certain environment. This patch fixed this test failure. Test Plan: db_compaction_test Reviewers: igor, sdong, anthony, IslamAbdelRahman Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D50637 --- db/db_compaction_test.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 1ff9469661..baf35c6728 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -239,9 +239,12 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) { env_->random_file_open_counter_.store(0); Reopen(options); - // As stats-update is disabled, we expect a very low - // number of random file open. - ASSERT_LT(env_->random_file_open_counter_.load(), 5); + // As stats-update is disabled, we expect a very low number of + // random file open. + // Note that this number must be changed accordingly if we change + // the number of files needed to be opened in the DB::Open process. + const int kMaxFileOpenCount = 10; + ASSERT_LT(env_->random_file_open_counter_.load(), kMaxFileOpenCount); // Repeat the reopen process, but this time we enable // stats-update. @@ -251,7 +254,7 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) { // Since we do a normal stats update on db-open, there // will be more random open files. - ASSERT_GT(env_->random_file_open_counter_.load(), 5); + ASSERT_GT(env_->random_file_open_counter_.load(), kMaxFileOpenCount); } TEST_F(DBCompactionTest, TestTableReaderForCompaction) { From 6ce42dd0750def33aaa632d064ba0a097ac05b14 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Fri, 6 Nov 2015 07:29:10 -0800 Subject: [PATCH 8/9] Don't merge WriteBatch-es if WAL is disabled Summary: There's no need for WriteImpl to flatten the write batch group into a single WriteBatch if the WAL is disabled. This diff moves the flattening into the WAL step, and skips flattening entirely if it isn't needed. It's good for about 5% speedup on a multi-threaded workload with no WAL. This diff also adds clarifying comments about the chance for partial failure of WriteBatchInternal::InsertInto, and always sets bg_error_ if the memtable state diverges from the logged state or if a WriteBatch succeeds only partially. Benchmark for speedup: db_bench -benchmarks=fillrandom -threads=16 -batch_size=1 -memtablerep=skip_list -value_size=0 --num=200000 -level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999 -disable_auto_compactions --max_write_buffer_number=8 -max_background_flushes=8 --disable_wal --write_buffer_size=160000000 Test Plan: asserts + make check Reviewers: sdong, igor Reviewed By: igor Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D50583 --- db/db_impl.cc | 82 +++++++++++++++++++++++++-------------- db/write_batch.cc | 74 ++++++++++++++++++----------------- db/write_batch_internal.h | 34 +++++++++++----- 3 files changed, 116 insertions(+), 74 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index a250dbbcbb..155f9096b7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3993,33 +3993,48 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // At this point the mutex is unlocked if (status.ok()) { - WriteBatch* updates = nullptr; - if (write_batch_group.size() == 1) { - updates = write_batch_group[0]; - } else { - updates = &tmp_batch_; - for (size_t i = 0; i < write_batch_group.size(); ++i) { - WriteBatchInternal::Append(updates, write_batch_group[i]); - } + int total_count = 0; + uint64_t total_byte_size = 0; + for (auto b : write_batch_group) { + total_count += WriteBatchInternal::Count(b); + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(b)); } const SequenceNumber current_sequence = last_sequence + 1; - WriteBatchInternal::SetSequence(updates, current_sequence); - int my_batch_count = WriteBatchInternal::Count(updates); - last_sequence += my_batch_count; - const uint64_t batch_size = WriteBatchInternal::ByteSize(updates); + last_sequence += total_count; + // Record statistics - RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count); - RecordTick(stats_, BYTES_WRITTEN, batch_size); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + PERF_TIMER_STOP(write_pre_and_post_process_time); + if (write_options.disableWAL) { flush_on_destroy_ = true; } - PERF_TIMER_STOP(write_pre_and_post_process_time); uint64_t log_size = 0; if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); - Slice log_entry = WriteBatchInternal::Contents(updates); + + WriteBatch* merged_batch = nullptr; + if (write_batch_group.size() == 1) { + merged_batch = write_batch_group[0]; + } else { + // WAL needs all of the batches flattened into a single batch. + // We could avoid copying here with an iov-like AddRecord + // interface + merged_batch = &tmp_batch_; + for (auto b : write_batch_group) { + WriteBatchInternal::Append(merged_batch, b); + } + } + WriteBatchInternal::SetSequence(merged_batch, current_sequence); + + assert(WriteBatchInternal::Count(merged_batch) == total_count); + assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size); + + Slice log_entry = WriteBatchInternal::Contents(merged_batch); status = logs_.back().writer->AddRecord(log_entry); total_log_size_ += log_entry.size(); alive_log_files_.back().AddSize(log_entry.size()); @@ -4049,34 +4064,41 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, status = directories_.GetWalDir()->Fsync(); } } + + if (merged_batch == &tmp_batch_) { + tmp_batch_.Clear(); + } } if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); status = WriteBatchInternal::InsertInto( - updates, column_family_memtables_.get(), - write_options.ignore_missing_column_families, 0, this, false); - // A non-OK status here indicates iteration failure (either in-memory - // writebatch corruption (very bad), or the client specified invalid - // column family). This will later on trigger bg_error_. + write_batch_group, current_sequence, column_family_memtables_.get(), + write_options.ignore_missing_column_families, + /*log_number*/ 0, this, /*dont_filter_deletes*/ false); + + // A non-OK status here indicates that the state implied by the + // WAL has diverged from the in-memory state. This could be + // because of a corrupt write_batch (very bad), or because the + // client specified an invalid column family and didn't specify + // ignore_missing_column_families. // - // Note that existing logic was not sound. Any partial failure writing - // into the memtable would result in a state that some write ops might - // have succeeded in memtable but Status reports error for all writes. + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if (!status.ok() && bg_error_.ok()) { + bg_error_ = status; + } SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); } PERF_TIMER_START(write_pre_and_post_process_time); - if (updates == &tmp_batch_) { - tmp_batch_.Clear(); - } mutex_.Lock(); // internal stats - default_cf_internal_stats_->AddDBStats( - InternalStats::BYTES_WRITTEN, batch_size); + default_cf_internal_stats_->AddDBStats(InternalStats::BYTES_WRITTEN, + total_byte_size); default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, - my_batch_count); + total_count); if (!write_options.disableWAL) { if (write_options.sync) { default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED, diff --git a/db/write_batch.cc b/db/write_batch.cc index 925a05efdd..e6e1acab37 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -591,6 +591,7 @@ class MemTableInserter : public WriteBatch::Handler { } return true; } + virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { Status seek_status; @@ -647,8 +648,8 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { + Status DeleteImpl(uint32_t column_family_id, const Slice& key, + ValueType delete_type) { Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; @@ -671,40 +672,20 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } } - mem->Add(sequence_, kTypeDeletion, key, Slice()); + mem->Add(sequence_, delete_type, key, Slice()); sequence_++; cf_mems_->CheckMemtableFull(); return Status::OK(); } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + return DeleteImpl(column_family_id, key, kTypeDeletion); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { - Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; - return seek_status; - } - MemTable* mem = cf_mems_->GetMemTable(); - auto* moptions = mem->GetMemTableOptions(); - if (!dont_filter_deletes_ && moptions->filter_deletes) { - SnapshotImpl read_from_snapshot; - read_from_snapshot.number_ = sequence_; - ReadOptions ropts; - ropts.snapshot = &read_from_snapshot; - std::string value; - auto cf_handle = cf_mems_->GetColumnFamilyHandle(); - if (cf_handle == nullptr) { - cf_handle = db_->DefaultColumnFamily(); - } - if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { - RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES); - return Status::OK(); - } - } - mem->Add(sequence_, kTypeSingleDeletion, key, Slice()); - sequence_++; - cf_mems_->CheckMemtableFull(); - return Status::OK(); + return DeleteImpl(column_family_id, key, kTypeSingleDeletion); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, @@ -791,18 +772,32 @@ class MemTableInserter : public WriteBatch::Handler { // This function can only be called in these conditions: // 1) During Recovery() -// 2) during Write(), in a single-threaded write thread -// The reason is that it calles ColumnFamilyMemTablesImpl::Seek(), which needs -// to be called from a single-threaded write thread (or while holding DB mutex) -Status WriteBatchInternal::InsertInto(const WriteBatch* b, +// 2) During Write(), in a single-threaded write thread +// The reason is that it calls memtables->Seek(), which has a stateful cache +Status WriteBatchInternal::InsertInto(const autovector& batches, + SequenceNumber sequence, ColumnFamilyMemTables* memtables, bool ignore_missing_column_families, uint64_t log_number, DB* db, const bool dont_filter_deletes) { - MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, + MemTableInserter inserter(sequence, memtables, ignore_missing_column_families, + log_number, db, dont_filter_deletes); + Status rv = Status::OK(); + for (size_t i = 0; i < batches.size() && rv.ok(); ++i) { + rv = batches[i]->Iterate(&inserter); + } + return rv; +} + +Status WriteBatchInternal::InsertInto(const WriteBatch* batch, + ColumnFamilyMemTables* memtables, + bool ignore_missing_column_families, + uint64_t log_number, DB* db, + const bool dont_filter_deletes) { + MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, ignore_missing_column_families, log_number, db, dont_filter_deletes); - return b->Iterate(&inserter); + return batch->Iterate(&inserter); } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { @@ -821,4 +816,13 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { std::memory_order_relaxed); } +size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, + size_t rightByteSize) { + if (leftByteSize == 0 || rightByteSize == 0) { + return leftByteSize + rightByteSize; + } else { + return leftByteSize + rightByteSize - kHeader; + } +} + } // namespace rocksdb diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 04db461a06..3ae4edc7a8 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -12,6 +12,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "util/autovector.h" namespace rocksdb { @@ -112,17 +113,28 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); - // Inserts batch entries into memtable - // If dont_filter_deletes is false AND options.filter_deletes is true, - // then --> Drops deletes in batch if db->KeyMayExist returns false - // If ignore_missing_column_families == true. WriteBatch referencing - // non-existing column family should be ignored. - // However, if ignore_missing_column_families == false, any WriteBatch - // referencing non-existing column family will return a InvalidArgument() - // failure. + // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive. + // + // If dont_filter_deletes is false AND options.filter_deletes is true + // AND db->KeyMayExist is false, then a Delete won't modify the memtable. + // + // If ignore_missing_column_families == true. WriteBatch + // referencing non-existing column family will be ignored. + // If ignore_missing_column_families == false, processing of the + // batches will be stopped if a reference is found to a non-existing + // column family and InvalidArgument() will be returned. The writes + // in batches may be only partially applied at that point. // // If log_number is non-zero, the memtable will be updated only if - // memtables->GetLogNumber() >= log_number + // memtables->GetLogNumber() >= log_number. + static Status InsertInto(const autovector& batches, + SequenceNumber sequence, + ColumnFamilyMemTables* memtables, + bool ignore_missing_column_families = false, + uint64_t log_number = 0, DB* db = nullptr, + const bool dont_filter_deletes = true); + + // Convenience form of InsertInto when you have only one batch static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, bool ignore_missing_column_families = false, @@ -130,6 +142,10 @@ class WriteBatchInternal { const bool dont_filter_deletes = true); static void Append(WriteBatch* dst, const WriteBatch* src); + + // Returns the byte size of appending a WriteBatch with ByteSize + // leftByteSize and a WriteBatch with ByteSize rightByteSize + static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); }; } // namespace rocksdb From 5ac16300b010281d49140b61e562a5bba119489f Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 12 Nov 2015 14:12:27 -0800 Subject: [PATCH 9/9] Fixed valgrind error in options_util_test Summary: Fixed valgrind error in options_util_test by deleting the compaction_filter allocated from RandomInitCFOptions(). Test Plan: valgrind --error-exitcode=2 --leak-check=full ./options_util_test Reviewers: anthony, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D50661 --- utilities/options/options_util_test.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/utilities/options/options_util_test.cc b/utilities/options/options_util_test.cc index 0ad29fb4b8..a017f56005 100644 --- a/utilities/options/options_util_test.cc +++ b/utilities/options/options_util_test.cc @@ -80,6 +80,12 @@ TEST_F(OptionsUtilTest, SaveAndLoad) { ASSERT_NOK(RocksDBOptionsParser::VerifyCFOptions( cf_opts[i], loaded_cf_descs[i].options)); } + + for (size_t i = 0; i < kCFCount; ++i) { + if (cf_opts[i].compaction_filter) { + delete cf_opts[i].compaction_filter; + } + } } } // namespace rocksdb