diff --git a/db/db_impl.cc b/db/db_impl.cc index fc98b2abd8..0708c49a89 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1219,14 +1219,16 @@ Status DBImpl::Recover( "flag but a log file already exists"); } - // Recover in the order in which the logs were generated - std::sort(logs.begin(), logs.end()); - for (const auto& log : logs) { - // The previous incarnation may not have written any MANIFEST - // records after allocating this log number. So we manually - // update the file number allocation counter in VersionSet. - versions_->MarkFileNumberUsed(log); - s = RecoverLogFile(log, &max_sequence, read_only); + if (!logs.empty()) { + // Recover in the order in which the logs were generated + std::sort(logs.begin(), logs.end()); + s = RecoverLogFiles(logs, &max_sequence, read_only); + if (!s.ok()) { + // Clear memtables if recovery failed + for (auto cfd : *versions_->GetColumnFamilySet()) { + cfd->CreateNewMemtable(); + } + } } SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence()); } @@ -1239,8 +1241,9 @@ Status DBImpl::Recover( return s; } -Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, - bool read_only) { +// REQUIRES: log_numbers are sorted in ascending order +Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, + SequenceNumber* max_sequence, bool read_only) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -1256,7 +1259,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, }; mutex_.AssertHeld(); - + Status status; std::unordered_map version_edits; // no need to refcount because iteration is under mutex for (auto cfd : *versions_->GetColumnFamilySet()) { @@ -1265,102 +1268,113 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, version_edits.insert({cfd->GetID(), edit}); } - // Open the log file - std::string fname = LogFileName(db_options_.wal_dir, log_number); - unique_ptr file; - Status status = env_->NewSequentialFile(fname, &file, env_options_); - if (!status.ok()) { - MaybeIgnoreError(&status); - return status; - } - - // Create the log reader. - LogReporter reporter; - reporter.env = env_; - reporter.info_log = db_options_.info_log.get(); - reporter.fname = fname.c_str(); - reporter.status = (db_options_.paranoid_checks && - !db_options_.skip_log_error_on_recovery ? &status - : nullptr); - // We intentially make log::Reader do checksumming even if - // paranoid_checks==false so that corruptions cause entire commits - // to be skipped instead of propagating bad information (like overly - // large sequence numbers). - log::Reader reader(std::move(file), &reporter, true/*checksum*/, - 0/*initial_offset*/); - Log(db_options_.info_log, "Recovering log #%" PRIu64 "", log_number); - - // Read all the records and add to a memtable - std::string scratch; - Slice record; - WriteBatch batch; - while (reader.ReadRecord(&record, &scratch)) { - if (record.size() < 12) { - reporter.Corruption(record.size(), - Status::Corruption("log record too small")); - continue; - } - WriteBatchInternal::SetContents(&batch, record); - - // If column family was not found, it might mean that the WAL write - // batch references to the column family that was dropped after the - // insert. We don't want to fail the whole write batch in that case -- we - // just ignore the update. That's why we set ignore missing column families - // to true - status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), - true /* ignore missing column families */, log_number); - - MaybeIgnoreError(&status); + for (auto log_number : log_numbers) { + // The previous incarnation may not have written any MANIFEST + // records after allocating this log number. So we manually + // update the file number allocation counter in VersionSet. + versions_->MarkFileNumberUsed(log_number); + // Open the log file + std::string fname = LogFileName(db_options_.wal_dir, log_number); + unique_ptr file; + status = env_->NewSequentialFile(fname, &file, env_options_); if (!status.ok()) { - return status; - } - const SequenceNumber last_seq = - WriteBatchInternal::Sequence(&batch) + - WriteBatchInternal::Count(&batch) - 1; - if (last_seq > *max_sequence) { - *max_sequence = last_seq; + MaybeIgnoreError(&status); + if (!status.ok()) { + return status; + } else { + // Fail with one log file, but that's ok. + // Try next one. + continue; + } } - if (!read_only) { - // no need to refcount since client still doesn't have access - // to the DB and can not drop column families while we iterate - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->mem()->ShouldFlush()) { - // If this asserts, it means that InsertInto failed in - // filtering updates to already-flushed column families - assert(cfd->GetLogNumber() <= log_number); - auto iter = version_edits.find(cfd->GetID()); - assert(iter != version_edits.end()); - VersionEdit* edit = &iter->second; - status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); - // we still want to clear the memtable, even if the recovery failed - cfd->CreateNewMemtable(); - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - return status; + // Create the log reader. + LogReporter reporter; + reporter.env = env_; + reporter.info_log = db_options_.info_log.get(); + reporter.fname = fname.c_str(); + reporter.status = + (db_options_.paranoid_checks && !db_options_.skip_log_error_on_recovery + ? &status + : nullptr); + // We intentially make log::Reader do checksumming even if + // paranoid_checks==false so that corruptions cause entire commits + // to be skipped instead of propagating bad information (like overly + // large sequence numbers). + log::Reader reader(std::move(file), &reporter, true /*checksum*/, + 0 /*initial_offset*/); + Log(db_options_.info_log, "Recovering log #%" PRIu64 "", log_number); + + // Read all the records and add to a memtable + std::string scratch; + Slice record; + WriteBatch batch; + while (reader.ReadRecord(&record, &scratch)) { + if (record.size() < 12) { + reporter.Corruption(record.size(), + Status::Corruption("log record too small")); + continue; + } + WriteBatchInternal::SetContents(&batch, record); + + // If column family was not found, it might mean that the WAL write + // batch references to the column family that was dropped after the + // insert. We don't want to fail the whole write batch in that case -- + // we just ignore the update. + // That's why we set ignore missing column families to true + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), true, log_number); + + MaybeIgnoreError(&status); + if (!status.ok()) { + return status; + } + const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + + WriteBatchInternal::Count(&batch) - 1; + if (last_seq > *max_sequence) { + *max_sequence = last_seq; + } + + if (!read_only) { + // no need to refcount since client still doesn't have access + // to the DB and can not drop column families while we iterate + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->mem()->ShouldFlush()) { + // If this asserts, it means that InsertInto failed in + // filtering updates to already-flushed column families + assert(cfd->GetLogNumber() <= log_number); + auto iter = version_edits.find(cfd->GetID()); + assert(iter != version_edits.end()); + VersionEdit* edit = &iter->second; + status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); + if (!status.ok()) { + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + return status; + } + cfd->CreateNewMemtable(); } } } } - } - if (versions_->LastSequence() < *max_sequence) { - versions_->SetLastSequence(*max_sequence); + if (versions_->LastSequence() < *max_sequence) { + versions_->SetLastSequence(*max_sequence); + } } if (!read_only) { // no need to refcount since client still doesn't have access // to the DB and can not drop column families while we iterate + auto max_log_number = log_numbers.back(); for (auto cfd : *versions_->GetColumnFamilySet()) { auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; - if (cfd->GetLogNumber() > log_number) { + if (cfd->GetLogNumber() > max_log_number) { // Column family cfd has already flushed the data - // from log_number. Memtable has to be empty because + // from all logs. Memtable has to be empty because // we filter the updates based on log_number // (in WriteBatch::InsertInto) assert(cfd->mem()->GetFirstSequenceNumber() == 0); @@ -1371,28 +1385,29 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // flush the final memtable (if non-empty) if (cfd->mem()->GetFirstSequenceNumber() != 0) { status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); - } - // we still want to clear the memtable, even if the recovery failed - cfd->CreateNewMemtable(); - if (!status.ok()) { - return status; + if (!status.ok()) { + // Recovery failed + break; + } + cfd->CreateNewMemtable(); } // write MANIFEST with update - // writing log number in the manifest means that any log file + // writing log_number in the manifest means that any log file // with number strongly less than (log_number + 1) is already // recovered and should be ignored on next reincarnation. - // Since we already recovered log_number, we want all logs - // with numbers `<= log_number` (includes this one) to be ignored - edit->SetLogNumber(log_number + 1); + // Since we already recovered max_log_number, we want all logs + // with numbers `<= max_log_number` (includes this one) to be ignored + edit->SetLogNumber(max_log_number + 1); // we must mark the next log number as used, even though it's // not actually used. that is because VersionSet assumes // VersionSet::next_file_number_ always to be strictly greater than any // log number - versions_->MarkFileNumberUsed(log_number + 1); + versions_->MarkFileNumberUsed(max_log_number + 1); status = versions_->LogAndApply(cfd, edit, &mutex_); if (!status.ok()) { - return status; + // Recovery failed + break; } } } diff --git a/db/db_impl.h b/db/db_impl.h index 4d6ba04953..cf7914fece 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -344,8 +344,9 @@ class DBImpl : public DB { DeletionState& deletion_state, LogBuffer* log_buffer); - Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, - bool read_only); + // REQUIRES: log_numbers are sorted in ascending order + Status RecoverLogFiles(const std::vector& log_numbers, + SequenceNumber* max_sequence, bool read_only); // The following two methods are used to flush a memtable to // storage. The first one is used atdatabase RecoveryTime (when the diff --git a/db/db_test.cc b/db/db_test.cc index 96f7e208aa..92d623468b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6120,18 +6120,18 @@ namespace { std::vector ListSpecificFiles( Env* env, const std::string& path, const FileType expected_file_type) { std::vector files; - std::vector log_files; + std::vector file_numbers; env->GetChildren(path, &files); uint64_t number; FileType type; for (size_t i = 0; i < files.size(); ++i) { if (ParseFileName(files[i], &number, &type)) { if (type == expected_file_type) { - log_files.push_back(number); + file_numbers.push_back(number); } } } - return std::move(log_files); + return std::move(file_numbers); } std::vector ListLogFiles(Env* env, const std::string& path) { @@ -6141,6 +6141,17 @@ std::vector ListLogFiles(Env* env, const std::string& path) { std::vector ListTableFiles(Env* env, const std::string& path) { return ListSpecificFiles(env, path, kTableFile); } + +std::uint64_t GetNumberOfSstFilesForColumnFamily( + DB* db, std::string column_family_name) { + std::vector metadata; + db->GetLiveFilesMetaData(&metadata); + uint64_t result = 0; + for (auto& fileMetadata : metadata) { + result += (fileMetadata.column_family_name == column_family_name); + } + return result; +} } // namespace TEST(DBTest, FlushOneColumnFamily) { @@ -6165,6 +6176,119 @@ TEST(DBTest, FlushOneColumnFamily) { } } +// In https://reviews.facebook.net/D20661 we change +// recovery behavior: previously for each log file each column family +// memtable was flushed, even it was empty. Now it's changed: +// we try to create the smallest number of table files by merging +// updates from multiple logs +TEST(DBTest, RecoverCheckFileAmountWithSmallWriteBuffer) { + Options options; + options.write_buffer_size = 5000000; + CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, &options); + + // Since we will reopen DB with smaller write_buffer_size, + // each key will go to new SST file + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + + ASSERT_OK(Put(3, Key(10), DummyString(1))); + // Make 'dobrynia' to be flushed and new WAL file to be created + ASSERT_OK(Put(2, Key(10), DummyString(7500000))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + { + auto tables = ListTableFiles(env_, dbname_); + ASSERT_EQ(tables.size(), 1); + // Make sure 'dobrynia' was flushed: check sst files amount + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), 1); + } + // New WAL file + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(3, Key(10), DummyString(1))); + ASSERT_OK(Put(3, Key(10), DummyString(1))); + ASSERT_OK(Put(3, Key(10), DummyString(1))); + + options.write_buffer_size = 10; + ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, + &options); + { + // No inserts => default is empty + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), 0); + // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 5); + // 1 SST for big key + 1 SST for small one + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), 2); + // 1 SST for all keys + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), 1); + } +} + +// In https://reviews.facebook.net/D20661 we change +// recovery behavior: previously for each log file each column family +// memtable was flushed, even it wasn't empty. Now it's changed: +// we try to create the smallest number of table files by merging +// updates from multiple logs +TEST(DBTest, RecoverCheckFileAmount) { + Options options; + options.write_buffer_size = 100000; + CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, &options); + + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + + // Make 'nikitich' memtable to be flushed + ASSERT_OK(Put(3, Key(10), DummyString(1002400))); + ASSERT_OK(Put(3, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + // 4 memtable are not flushed, 1 sst file + { + auto tables = ListTableFiles(env_, dbname_); + ASSERT_EQ(tables.size(), 1); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), 1); + } + // Memtable for 'nikitich' has flushed, new WAL file has opened + // 4 memtable still not flushed + + // Write to new WAL file + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + + // Fill up 'nikitich' one more time + ASSERT_OK(Put(3, Key(10), DummyString(1002400))); + // make it flush + ASSERT_OK(Put(3, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + // There are still 4 memtable not flushed, and 2 sst tables + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + + { + auto tables = ListTableFiles(env_, dbname_); + ASSERT_EQ(tables.size(), 2); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), 2); + } + + ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, + &options); + { + std::vector table_files = ListTableFiles(env_, dbname_); + // Check, that records for 'default', 'dobrynia' and 'pikachu' from + // first, second and third WALs went to the same SST. + // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for + // 'dobrynia', one for 'pikachu' + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), 1); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), 3); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), 1); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1); + } +} + TEST(DBTest, WALArchivalTtl) { do { Options options = CurrentOptions();