diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c3c77f849..4b976608b3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -607,6 +607,7 @@ set(SOURCES db/version_edit.cc db/version_edit_handler.cc db/version_set.cc + db/wal_edit.cc db/wal_manager.cc db/write_batch.cc db/write_batch_base.cc @@ -1078,6 +1079,7 @@ if(WITH_TESTS) db/version_edit_test.cc db/version_set_test.cc db/wal_manager_test.cc + db/wal_edit_test.cc db/write_batch_test.cc db/write_callback_test.cc db/write_controller_test.cc diff --git a/Makefile b/Makefile index e69ac2639c..d5d4f8388f 100644 --- a/Makefile +++ b/Makefile @@ -1525,6 +1525,9 @@ compact_on_deletion_collector_test: $(OBJ_DIR)/utilities/table_properties_collec wal_manager_test: $(OBJ_DIR)/db/wal_manager_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 090dfdb32b..f82ec8bcfc 100644 --- a/TARGETS +++ b/TARGETS @@ -183,6 +183,7 @@ cpp_library( "db/version_edit.cc", "db/version_edit_handler.cc", "db/version_set.cc", + "db/wal_edit.cc", "db/wal_manager.cc", "db/write_batch.cc", "db/write_batch_base.cc", diff --git a/db/version_edit.cc b/db/version_edit.cc index 528c03c953..97c58a1535 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -21,63 +21,6 @@ namespace ROCKSDB_NAMESPACE { namespace { -// Tag numbers for serialized VersionEdit. These numbers are written to -// disk and should not be changed. The number should be forward compatible so -// users can down-grade RocksDB safely. A future Tag is ignored by doing '&' -// between Tag and kTagSafeIgnoreMask field. -enum Tag : uint32_t { - kComparator = 1, - kLogNumber = 2, - kNextFileNumber = 3, - kLastSequence = 4, - kCompactPointer = 5, - kDeletedFile = 6, - kNewFile = 7, - // 8 was used for large value refs - kPrevLogNumber = 9, - kMinLogNumberToKeep = 10, - - // these are new formats divergent from open source leveldb - kNewFile2 = 100, - kNewFile3 = 102, - kNewFile4 = 103, // 4th (the latest) format version of adding files - kColumnFamily = 200, // specify column family for version edit - kColumnFamilyAdd = 201, - kColumnFamilyDrop = 202, - kMaxColumnFamily = 203, - - kInAtomicGroup = 300, - - // Mask for an unidentified tag from the future which can be safely ignored. - kTagSafeIgnoreMask = 1 << 13, - - // Forward compatible (aka ignorable) records - kDbId, - kBlobFileAddition, - kBlobFileGarbage, -}; - -enum NewFileCustomTag : uint32_t { - kTerminate = 1, // The end of customized fields - kNeedCompaction = 2, - // Since Manifest is not entirely forward-compatible, we currently encode - // kMinLogNumberToKeep as part of NewFile as a hack. This should be removed - // when manifest becomes forward-comptabile. - kMinLogNumberToKeepHack = 3, - kOldestBlobFileNumber = 4, - kOldestAncesterTime = 5, - kFileCreationTime = 6, - kFileChecksum = 7, - kFileChecksumFuncName = 8, - - // If this bit for the custom tag is set, opening DB should fail if - // we don't know this field. - kCustomTagNonSafeIgnoreMask = 1 << 6, - - // Forward incompatible (aka unignorable) fields - kPathId, -}; - } // anonymous namespace uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { @@ -150,6 +93,8 @@ void VersionEdit::Clear() { new_files_.clear(); blob_file_additions_.clear(); blob_file_garbages_.clear(); + wal_additions_.clear(); + wal_deletions_.clear(); column_family_ = 0; is_column_family_add_ = false; is_column_family_drop_ = false; @@ -284,6 +229,16 @@ bool VersionEdit::EncodeTo(std::string* dst) const { blob_file_garbage.EncodeTo(dst); } + for (const auto& wal_addition : wal_additions_) { + PutVarint32(dst, kWalAddition); + wal_addition.EncodeTo(dst); + } + + for (const auto& wal_deletion : wal_deletions_) { + PutVarint32(dst, kWalDeletion); + wal_deletion.EncodeTo(dst); + } + // 0 is default and does not need to be explicitly written if (column_family_ != 0) { PutVarint32Varint32(dst, kColumnFamily, column_family_); @@ -608,6 +563,28 @@ Status VersionEdit::DecodeFrom(const Slice& src) { break; } + case kWalAddition: { + WalAddition wal_addition; + const Status s = wal_addition.DecodeFrom(&input); + if (!s.ok()) { + return s; + } + + wal_additions_.emplace_back(std::move(wal_addition)); + break; + } + + case kWalDeletion: { + WalDeletion wal_deletion; + const Status s = wal_deletion.DecodeFrom(&input); + if (!s.ok()) { + return s; + } + + wal_deletions_.emplace_back(std::move(wal_deletion)); + break; + } + case kColumnFamily: if (!GetVarint32(&input, &column_family_)) { if (!msg) { @@ -748,6 +725,16 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append(blob_file_garbage.DebugString()); } + for (const auto& wal_addition : wal_additions_) { + r.append("\n WalAddition: "); + r.append(wal_addition.DebugString()); + } + + for (const auto& wal_deletion : wal_deletions_) { + r.append("\n WalDeletion: "); + r.append(wal_deletion.DebugString()); + } + r.append("\n ColumnFamily: "); AppendNumberTo(&r, column_family_); if (is_column_family_add_) { @@ -858,6 +845,34 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { jw.EndArray(); } + if (!wal_additions_.empty()) { + jw << "WalAdditions"; + + jw.StartArray(); + + for (const auto& wal_addition : wal_additions_) { + jw.StartArrayedObject(); + jw << wal_addition; + jw.EndArrayedObject(); + } + + jw.EndArray(); + } + + if (!wal_deletions_.empty()) { + jw << "WalDeletions"; + + jw.StartArray(); + + for (const auto& wal_deletion : wal_deletions_) { + jw.StartArrayedObject(); + jw << wal_deletion; + jw.EndArrayedObject(); + } + + jw.EndArray(); + } + jw << "ColumnFamily" << column_family_; if (is_column_family_add_) { diff --git a/db/version_edit.h b/db/version_edit.h index 0ca0228548..f93092eb0d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -13,9 +13,11 @@ #include #include #include + #include "db/blob/blob_file_addition.h" #include "db/blob/blob_file_garbage.h" #include "db/dbformat.h" +#include "db/wal_edit.h" #include "memory/arena.h" #include "rocksdb/cache.h" #include "table/table_reader.h" @@ -23,6 +25,65 @@ namespace ROCKSDB_NAMESPACE { +// Tag numbers for serialized VersionEdit. These numbers are written to +// disk and should not be changed. The number should be forward compatible so +// users can down-grade RocksDB safely. A future Tag is ignored by doing '&' +// between Tag and kTagSafeIgnoreMask field. +enum Tag : uint32_t { + kComparator = 1, + kLogNumber = 2, + kNextFileNumber = 3, + kLastSequence = 4, + kCompactPointer = 5, + kDeletedFile = 6, + kNewFile = 7, + // 8 was used for large value refs + kPrevLogNumber = 9, + kMinLogNumberToKeep = 10, + + // these are new formats divergent from open source leveldb + kNewFile2 = 100, + kNewFile3 = 102, + kNewFile4 = 103, // 4th (the latest) format version of adding files + kColumnFamily = 200, // specify column family for version edit + kColumnFamilyAdd = 201, + kColumnFamilyDrop = 202, + kMaxColumnFamily = 203, + + kInAtomicGroup = 300, + + // Mask for an unidentified tag from the future which can be safely ignored. + kTagSafeIgnoreMask = 1 << 13, + + // Forward compatible (aka ignorable) records + kDbId, + kBlobFileAddition, + kBlobFileGarbage, + kWalAddition, + kWalDeletion, +}; + +enum NewFileCustomTag : uint32_t { + kTerminate = 1, // The end of customized fields + kNeedCompaction = 2, + // Since Manifest is not entirely forward-compatible, we currently encode + // kMinLogNumberToKeep as part of NewFile as a hack. This should be removed + // when manifest becomes forward-comptabile. + kMinLogNumberToKeepHack = 3, + kOldestBlobFileNumber = 4, + kOldestAncesterTime = 5, + kFileCreationTime = 6, + kFileChecksum = 7, + kFileChecksumFuncName = 8, + + // If this bit for the custom tag is set, opening DB should fail if + // we don't know this field. + kCustomTagNonSafeIgnoreMask = 1 << 6, + + // Forward incompatible (aka unignorable) fields + kPathId, +}; + class VersionSet; constexpr uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; @@ -374,10 +435,29 @@ class VersionEdit { return blob_file_garbages_; } + // Add a WAL (either just created or closed). + void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) { + wal_additions_.emplace_back(number, std::move(metadata)); + } + + // Retrieve all the added WALs. + const WalAdditions& GetWalAdditions() const { return wal_additions_; } + + bool HasWalAddition() const { return !wal_additions_.empty(); } + + // Delete a WAL (either directly deleted or archived). + void DeleteWal(WalNumber number) { wal_deletions_.emplace_back(number); } + + // Retrieve all the deleted WALs. + const WalDeletions& GetWalDeletions() const { return wal_deletions_; } + + bool HasWalDeletion() const { return !wal_deletions_.empty(); } + // Number of edits size_t NumEntries() const { return new_files_.size() + deleted_files_.size() + - blob_file_additions_.size() + blob_file_garbages_.size(); + blob_file_additions_.size() + blob_file_garbages_.size() + + wal_additions_.size() + wal_deletions_.size(); } void SetColumnFamily(uint32_t column_family_id) { @@ -457,6 +537,9 @@ class VersionEdit { BlobFileAdditions blob_file_additions_; BlobFileGarbages blob_file_garbages_; + WalAdditions wal_additions_; + WalDeletions wal_deletions_; + // Each version edit record should have column_family_ set // If it's not set, it is default (0) uint32_t column_family_ = 0; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 056f4adaf5..e073f5439f 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -309,6 +309,211 @@ TEST_F(VersionEditTest, BlobFileAdditionAndGarbage) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, AddWalEncodeDecode) { + VersionEdit edit; + for (uint64_t log_number = 1; log_number <= 20; log_number++) { + WalMetadata meta(rand() % 100); + bool has_size = rand() % 2 == 0; + if (has_size) { + meta.SetSizeInBytes(rand() % 1000); + } + edit.AddWal(log_number, meta); + } + TestEncodeDecode(edit); +} + +TEST_F(VersionEditTest, AddWalDecodeBadLogNumber) { + std::string encoded; + PutVarint32(&encoded, Tag::kWalAddition); + + { + // No log number. + VersionEdit edit; + Status s = edit.DecodeFrom(encoded); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") != + std::string::npos) + << s.ToString(); + } + + { + // log number should be varint64, + // but we only encode 128 which is not a valid representation of varint64. + char c = 0; + unsigned char* ptr = reinterpret_cast(&c); + *ptr = 128; + encoded.append(1, c); + VersionEdit edit; + Status s = edit.DecodeFrom(encoded); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("Error decoding WAL log number") != + std::string::npos) + << s.ToString(); + } +} + +TEST_F(VersionEditTest, AddWalDecodeBadTag) { + constexpr WalNumber kLogNumber = 100; + constexpr uint64_t kSizeInBytes = 100; + + std::string encoded_without_tag; + PutVarint32(&encoded_without_tag, Tag::kWalAddition); + PutVarint64(&encoded_without_tag, kLogNumber); + + { + // No tag. + VersionEdit edit; + Status s = edit.DecodeFrom(encoded_without_tag); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) + << s.ToString(); + } + + { + // Only has size tag, no terminate tag. + std::string encoded_with_size = encoded_without_tag; + PutVarint32(&encoded_with_size, + static_cast(WalAdditionTag::kSize)); + PutVarint64(&encoded_with_size, kSizeInBytes); + VersionEdit edit; + Status s = edit.DecodeFrom(encoded_with_size); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) + << s.ToString(); + } + + { + // Only has terminate tag. + std::string encoded_with_terminate = encoded_without_tag; + PutVarint32(&encoded_with_terminate, + static_cast(WalAdditionTag::kTerminate)); + VersionEdit edit; + ASSERT_OK(edit.DecodeFrom(encoded_with_terminate)); + auto& wal_addition = edit.GetWalAdditions()[0]; + ASSERT_EQ(wal_addition.GetLogNumber(), kLogNumber); + ASSERT_FALSE(wal_addition.GetMetadata().HasSize()); + } +} + +TEST_F(VersionEditTest, AddWalDecodeNoSize) { + constexpr WalNumber kLogNumber = 100; + + std::string encoded; + PutVarint32(&encoded, Tag::kWalAddition); + PutVarint64(&encoded, kLogNumber); + PutVarint32(&encoded, static_cast(WalAdditionTag::kSize)); + // No real size after the size tag. + + { + // Without terminate tag. + VersionEdit edit; + Status s = edit.DecodeFrom(encoded); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("Error decoding WAL file size") != + std::string::npos) + << s.ToString(); + } + + { + // With terminate tag. + PutVarint32(&encoded, static_cast(WalAdditionTag::kTerminate)); + VersionEdit edit; + Status s = edit.DecodeFrom(encoded); + ASSERT_TRUE(s.IsCorruption()); + // The terminate tag is misunderstood as the size. + ASSERT_TRUE(s.ToString().find("Error decoding tag") != std::string::npos) + << s.ToString(); + } +} + +TEST_F(VersionEditTest, AddWalDebug) { + constexpr int n = 2; + constexpr std::array kLogNumbers{{10, 20}}; + constexpr std::array kSizeInBytes{{100, 200}}; + + VersionEdit edit; + for (int i = 0; i < n; i++) { + edit.AddWal(kLogNumbers[i], WalMetadata(kSizeInBytes[i])); + } + + const WalAdditions& wals = edit.GetWalAdditions(); + + ASSERT_TRUE(edit.HasWalAddition()); + ASSERT_EQ(wals.size(), n); + for (int i = 0; i < n; i++) { + const WalAddition& wal = wals[i]; + ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]); + ASSERT_EQ(wal.GetMetadata().GetSizeInBytes(), kSizeInBytes[i]); + } + + std::string expected_str = "VersionEdit {\n"; + for (int i = 0; i < n; i++) { + std::stringstream ss; + ss << " WalAddition: log_number: " << kLogNumbers[i] + << " size_in_bytes: " << kSizeInBytes[i] << "\n"; + expected_str += ss.str(); + } + expected_str += " ColumnFamily: 0\n}\n"; + ASSERT_EQ(edit.DebugString(true), expected_str); + + std::string expected_json = "{\"EditNumber\": 4, \"WalAdditions\": ["; + for (int i = 0; i < n; i++) { + std::stringstream ss; + ss << "{\"LogNumber\": " << kLogNumbers[i] << ", " + << "\"SizeInBytes\": " << kSizeInBytes[i] << "}"; + if (i < n - 1) ss << ", "; + expected_json += ss.str(); + } + expected_json += "], \"ColumnFamily\": 0}"; + ASSERT_EQ(edit.DebugJSON(4, true), expected_json); +} + +TEST_F(VersionEditTest, DeleteWalEncodeDecode) { + VersionEdit edit; + for (uint64_t log_number = 1; log_number <= 20; log_number++) { + edit.DeleteWal(log_number); + } + TestEncodeDecode(edit); +} + +TEST_F(VersionEditTest, DeleteWalDebug) { + constexpr int n = 2; + constexpr std::array kLogNumbers{{10, 20}}; + + VersionEdit edit; + for (int i = 0; i < n; i++) { + edit.DeleteWal(kLogNumbers[i]); + } + + const WalDeletions& wals = edit.GetWalDeletions(); + + ASSERT_TRUE(edit.HasWalDeletion()); + ASSERT_EQ(wals.size(), n); + for (int i = 0; i < n; i++) { + const WalDeletion& wal = wals[i]; + ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]); + } + + std::string expected_str = "VersionEdit {\n"; + for (int i = 0; i < n; i++) { + std::stringstream ss; + ss << " WalDeletion: log_number: " << kLogNumbers[i] << "\n"; + expected_str += ss.str(); + } + expected_str += " ColumnFamily: 0\n}\n"; + ASSERT_EQ(edit.DebugString(true), expected_str); + + std::string expected_json = "{\"EditNumber\": 4, \"WalDeletions\": ["; + for (int i = 0; i < n; i++) { + std::stringstream ss; + ss << "{\"LogNumber\": " << kLogNumbers[i] << "}"; + if (i < n - 1) ss << ", "; + expected_json += ss.str(); + } + expected_json += "], \"ColumnFamily\": 0}"; + ASSERT_EQ(edit.DebugJSON(4, true), expected_json); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 2c8817cae8..beb029d1f6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3696,6 +3696,7 @@ void VersionSet::Reset() { manifest_file_size_ = 0; obsolete_files_.clear(); obsolete_manifests_.clear(); + wals_.Reset(); } void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, diff --git a/db/version_set.h b/db/version_set.h index b36e88620f..3d188d1035 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1185,6 +1185,8 @@ class VersionSet { // Get the IO Status returned by written Manifest. const IOStatus& io_status() const { return io_status_; } + const WalSet& GetWalSet() const { return wals_; } + void TEST_CreateAndAppendVersion(ColumnFamilyData* cfd) { assert(cfd); @@ -1271,6 +1273,8 @@ class VersionSet { Status VerifyFileMetadata(const std::string& fpath, const FileMetaData& meta) const; + WalSet wals_; + std::unique_ptr column_family_set_; Env* const env_; FileSystem* const fs_; diff --git a/db/wal_edit.cc b/db/wal_edit.cc new file mode 100644 index 0000000000..d68a98d31e --- /dev/null +++ b/db/wal_edit.cc @@ -0,0 +1,175 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/wal_edit.h" + +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +void WalAddition::EncodeTo(std::string* dst) const { + PutVarint64(dst, number_); + + if (metadata_.HasSize()) { + PutVarint32(dst, static_cast(WalAdditionTag::kSize)); + PutVarint64(dst, metadata_.GetSizeInBytes()); + } + + PutVarint32(dst, static_cast(WalAdditionTag::kTerminate)); +} + +Status WalAddition::DecodeFrom(Slice* src) { + constexpr char class_name[] = "WalAddition"; + + if (!GetVarint64(src, &number_)) { + return Status::Corruption(class_name, "Error decoding WAL log number"); + } + + while (true) { + uint32_t tag_value = 0; + if (!GetVarint32(src, &tag_value)) { + return Status::Corruption(class_name, "Error decoding tag"); + } + WalAdditionTag tag = static_cast(tag_value); + switch (tag) { + case WalAdditionTag::kSize: { + uint64_t size = 0; + if (!GetVarint64(src, &size)) { + return Status::Corruption(class_name, "Error decoding WAL file size"); + } + metadata_.SetSizeInBytes(size); + break; + } + // TODO: process future tags such as checksum. + case WalAdditionTag::kTerminate: + return Status::OK(); + default: { + std::stringstream ss; + ss << "Unknown tag " << tag_value; + return Status::Corruption(class_name, ss.str()); + } + } + } +} + +JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) { + jw << "LogNumber" << wal.GetLogNumber() << "SizeInBytes" + << wal.GetMetadata().GetSizeInBytes(); + return jw; +} + +std::ostream& operator<<(std::ostream& os, const WalAddition& wal) { + os << "log_number: " << wal.GetLogNumber() + << " size_in_bytes: " << wal.GetMetadata().GetSizeInBytes(); + return os; +} + +std::string WalAddition::DebugString() const { + std::ostringstream oss; + oss << *this; + return oss.str(); +} + +void WalDeletion::EncodeTo(std::string* dst) const { + PutVarint64(dst, number_); +} + +Status WalDeletion::DecodeFrom(Slice* src) { + constexpr char class_name[] = "WalDeletion"; + + if (!GetVarint64(src, &number_)) { + return Status::Corruption(class_name, "Error decoding WAL log number"); + } + + return Status::OK(); +} + +JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) { + jw << "LogNumber" << wal.GetLogNumber(); + return jw; +} + +std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) { + os << "log_number: " << wal.GetLogNumber(); + return os; +} + +std::string WalDeletion::DebugString() const { + std::ostringstream oss; + oss << *this; + return oss.str(); +} + +Status WalSet::AddWal(const WalAddition& wal) { + auto it = wals_.lower_bound(wal.GetLogNumber()); + if (wal.GetMetadata().HasSize()) { + // The WAL must exist without size. + if (it == wals_.end() || it->first != wal.GetLogNumber()) { + std::stringstream ss; + ss << "WAL " << wal.GetLogNumber() << " is not created before closing"; + return Status::Corruption("WalSet", ss.str()); + } + if (it->second.HasSize()) { + std::stringstream ss; + ss << "WAL " << wal.GetLogNumber() << " is closed more than once"; + return Status::Corruption("WalSet", ss.str()); + } + it->second = wal.GetMetadata(); + } else { + // The WAL must not exist beforehand. + if (it != wals_.end() && it->first == wal.GetLogNumber()) { + std::stringstream ss; + ss << "WAL " << wal.GetLogNumber() << " is created more than once"; + return Status::Corruption("WalSet", ss.str()); + } + wals_[wal.GetLogNumber()] = wal.GetMetadata(); + } + return Status::OK(); +} + +Status WalSet::AddWals(const WalAdditions& wals) { + Status s; + for (const WalAddition& wal : wals) { + s = AddWal(wal); + if (!s.ok()) { + break; + } + } + return s; +} + +Status WalSet::DeleteWal(const WalDeletion& wal) { + auto it = wals_.lower_bound(wal.GetLogNumber()); + // The WAL must exist and has been closed. + if (it == wals_.end() || it->first != wal.GetLogNumber()) { + std::stringstream ss; + ss << "WAL " << wal.GetLogNumber() << " must exist before deletion"; + return Status::Corruption("WalSet", ss.str()); + } + if (!it->second.HasSize()) { + std::stringstream ss; + ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion"; + return Status::Corruption("WalSet", ss.str()); + } + wals_.erase(it); + return Status::OK(); +} + +Status WalSet::DeleteWals(const WalDeletions& wals) { + Status s; + for (const WalDeletion& wal : wals) { + s = DeleteWal(wal); + if (!s.ok()) { + break; + } + } + return s; +} + +void WalSet::Reset() { wals_.clear(); } + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/wal_edit.h b/db/wal_edit.h new file mode 100644 index 0000000000..e5b577e1b5 --- /dev/null +++ b/db/wal_edit.h @@ -0,0 +1,143 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +// WAL related classes used in VersionEdit and VersionSet. + +#pragma once + +#include +#include +#include +#include + +#include "logging/event_logger.h" +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +class JSONWriter; +class Slice; +class Status; + +using WalNumber = uint64_t; + +// Metadata of a WAL. +class WalMetadata { + public: + WalMetadata() = default; + + explicit WalMetadata(uint64_t size_bytes) : size_bytes_(size_bytes) {} + + bool HasSize() const { return size_bytes_ != kUnknownWalSize; } + + void SetSizeInBytes(uint64_t bytes) { size_bytes_ = bytes; } + + uint64_t GetSizeInBytes() const { return size_bytes_; } + + private: + // The size of WAL is unknown, used when the WAL is not closed yet. + constexpr static uint64_t kUnknownWalSize = 0; + + // Size of a closed WAL in bytes. + uint64_t size_bytes_ = kUnknownWalSize; +}; + +// These tags are persisted to MANIFEST, so it's part of the user API. +enum class WalAdditionTag : uint32_t { + // Indicates that there are no more tags. + kTerminate = 1, + // Size in bytes. + kSize = 2, + // Add tags in the future, such as checksum? +}; + +// Records the event of adding a WAL in VersionEdit. +class WalAddition { + public: + WalAddition() : number_(0), metadata_() {} + + explicit WalAddition(WalNumber number) : number_(number), metadata_() {} + + WalAddition(WalNumber number, WalMetadata meta) + : number_(number), metadata_(std::move(meta)) {} + + WalNumber GetLogNumber() const { return number_; } + + const WalMetadata& GetMetadata() const { return metadata_; } + + void EncodeTo(std::string* dst) const; + + Status DecodeFrom(Slice* src); + + std::string DebugString() const; + + private: + WalNumber number_; + WalMetadata metadata_; +}; + +std::ostream& operator<<(std::ostream& os, const WalAddition& wal); +JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal); + +using WalAdditions = std::vector; + +// Records the event of deleting/archiving a WAL in VersionEdit. +class WalDeletion { + public: + WalDeletion() : number_(0) {} + + explicit WalDeletion(WalNumber number) : number_(number) {} + + WalNumber GetLogNumber() const { return number_; } + + void EncodeTo(std::string* dst) const; + + Status DecodeFrom(Slice* src); + + std::string DebugString() const; + + private: + WalNumber number_; +}; + +std::ostream& operator<<(std::ostream& os, const WalDeletion& wal); +JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal); + +using WalDeletions = std::vector; + +// Used in VersionSet to keep the current set of WALs. +// +// When a WAL is created, closed, deleted, or archived, +// a VersionEdit is logged to MANIFEST and +// the WAL is added to or deleted from WalSet. +// +// Not thread safe, needs external synchronization such as holding DB mutex. +class WalSet { + public: + // Add WAL(s). + // If the WAL has size, it means the WAL is closed, + // then there must be an existing WAL without size that is added + // when creating the WAL, otherwise, return Status::Corruption. + // Can happen when applying a VersionEdit or recovering from MANIFEST. + Status AddWal(const WalAddition& wal); + Status AddWals(const WalAdditions& wals); + + // Delete WAL(s). + // The WAL to be deleted must exist, otherwise, + // return Status::Corruption. + // Can happen when applying a VersionEdit or recovering from MANIFEST. + Status DeleteWal(const WalDeletion& wal); + Status DeleteWals(const WalDeletions& wals); + + // Resets the internal state. + void Reset(); + + const std::map& GetWals() const { return wals_; } + + private: + std::map wals_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/wal_edit_test.cc b/db/wal_edit_test.cc new file mode 100644 index 0000000000..58b2e80ddd --- /dev/null +++ b/db/wal_edit_test.cc @@ -0,0 +1,127 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/wal_edit.h" + +#include "port/port.h" +#include "port/stack_trace.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(WalSet, AddDeleteReset) { + WalSet wals; + ASSERT_TRUE(wals.GetWals().empty()); + + // Create WAL 1 - 10. + for (WalNumber log_number = 1; log_number <= 10; log_number++) { + wals.AddWal(WalAddition(log_number)); + } + ASSERT_EQ(wals.GetWals().size(), 10); + + // Close WAL 1 - 5. + for (WalNumber log_number = 1; log_number <= 5; log_number++) { + wals.AddWal(WalAddition(log_number, WalMetadata(100))); + } + ASSERT_EQ(wals.GetWals().size(), 10); + + // Delete WAL 1 - 5. + for (WalNumber log_number = 1; log_number <= 5; log_number++) { + wals.DeleteWal(WalDeletion(log_number)); + } + ASSERT_EQ(wals.GetWals().size(), 5); + + WalNumber expected_log_number = 6; + for (auto it : wals.GetWals()) { + WalNumber log_number = it.first; + ASSERT_EQ(log_number, expected_log_number++); + } + + wals.Reset(); + ASSERT_TRUE(wals.GetWals().empty()); +} + +TEST(WalSet, Overwrite) { + constexpr WalNumber kNumber = 100; + constexpr uint64_t kBytes = 200; + WalSet wals; + wals.AddWal(WalAddition(kNumber)); + ASSERT_FALSE(wals.GetWals().at(kNumber).HasSize()); + wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); + ASSERT_TRUE(wals.GetWals().at(kNumber).HasSize()); + ASSERT_EQ(wals.GetWals().at(kNumber).GetSizeInBytes(), kBytes); +} + +TEST(WalSet, CreateTwice) { + constexpr WalNumber kNumber = 100; + WalSet wals; + ASSERT_OK(wals.AddWal(WalAddition(kNumber))); + Status s = wals.AddWal(WalAddition(kNumber)); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") != + std::string::npos); +} + +TEST(WalSet, CloseTwice) { + constexpr WalNumber kNumber = 100; + constexpr uint64_t kBytes = 200; + WalSet wals; + ASSERT_OK(wals.AddWal(WalAddition(kNumber))); + ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); + Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 100 is closed more than once") != + std::string::npos); +} + +TEST(WalSet, CloseBeforeCreate) { + constexpr WalNumber kNumber = 100; + constexpr uint64_t kBytes = 200; + WalSet wals; + Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 100 is not created before closing") != + std::string::npos); +} + +TEST(WalSet, CreateAfterClose) { + constexpr WalNumber kNumber = 100; + constexpr uint64_t kBytes = 200; + WalSet wals; + ASSERT_OK(wals.AddWal(WalAddition(kNumber))); + ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); + Status s = wals.AddWal(WalAddition(kNumber)); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") != + std::string::npos); +} + +TEST(WalSet, DeleteNonExistingWal) { + constexpr WalNumber kNonExistingNumber = 100; + WalSet wals; + Status s = wals.DeleteWal(WalDeletion(kNonExistingNumber)); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 100 must exist before deletion") != + std::string::npos); +} + +TEST(WalSet, DeleteNonClosedWal) { + constexpr WalNumber kNonExistingNumber = 100; + WalSet wals; + ASSERT_OK(wals.AddWal(WalAddition(kNonExistingNumber))); + Status s = wals.DeleteWal(WalDeletion(kNonExistingNumber)); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 100 must be closed before deletion") != + std::string::npos); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src.mk b/src.mk index 0656b6f0c7..be64083c12 100644 --- a/src.mk +++ b/src.mk @@ -67,6 +67,7 @@ LIB_SOURCES = \ db/version_edit.cc \ db/version_edit_handler.cc \ db/version_set.cc \ + db/wal_edit.cc \ db/wal_manager.cc \ db/write_batch.cc \ db/write_batch_base.cc \