From f837f5b1c95a43f72d1f5257cd13c701c2e65b51 Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Thu, 24 Oct 2013 19:09:02 -0700 Subject: [PATCH] Making the transaction log iterator more robust Summary: strict essentially means that we MUST find the startsequence. Thus we should return if starteSequence is not found in the first file in case strict is set. This will take care of ending the iterator in case of permanent gaps due to corruptions in the log files Also created NextImpl function that will have internal variable to distinguish whether Next is being called from StartSequence or by application. Set NotFoudn::gaps status to give an indication of gaps happeneing. Polished the inline documentation at various places Test Plan: * db_repl_stress test * db_test relating to transaction log iterator * fbcode/wormhole/rocksdb/rocks_log_iterator * sigma production machine sigmafio032.prn1 Reviewers: dhruba Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13689 --- db/db_impl.cc | 11 +--- db/db_impl.h | 6 +- db/db_test.cc | 56 ++++++++++++++++--- db/transaction_log_impl.cc | 92 ++++++++++++++++++------------- db/transaction_log_impl.h | 16 ++++-- include/rocksdb/db.h | 20 +++---- include/rocksdb/transaction_log.h | 6 +- include/utilities/stackable_db.h | 2 +- utilities/ttl/db_ttl.cc | 2 +- utilities/ttl/db_ttl.h | 2 +- 10 files changed, 131 insertions(+), 82 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index d214a55c8a..32985a9ca1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -274,7 +274,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) flush_on_destroy_(false), stats_(options.num_levels), delayed_writes_(0), - last_flushed_sequence_(0), storage_options_(options), bg_work_gate_closed_(false), refitting_level_(false) { @@ -743,9 +742,6 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, if (s.ok()) { if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); - last_flushed_sequence_ = max_sequence; - } else { - last_flushed_sequence_ = versions_->LastSequence(); } SetTickerCount(options_.statistics, SEQUENCE_NUMBER, versions_->LastSequence()); @@ -1180,7 +1176,7 @@ Status DBImpl::Flush(const FlushOptions& options) { return status; } -SequenceNumber DBImpl::GetLatestSequenceNumber() { +SequenceNumber DBImpl::GetLatestSequenceNumber() const { return versions_->LastSequence(); } @@ -1188,7 +1184,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, unique_ptr* iter) { RecordTick(options_.statistics, GET_UPDATES_SINCE_CALLS); - if (seq > last_flushed_sequence_) { + if (seq > versions_->LastSequence()) { return Status::IOError("Requested sequence not yet written in the db"); } // Get all sorted Wal Files. @@ -1210,7 +1206,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, storage_options_, seq, std::move(wal_files), - &last_flushed_sequence_)); + this)); return (*iter)->status(); } @@ -2682,7 +2678,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { mutex_.Lock(); if (status.ok()) { versions_->SetLastSequence(last_sequence); - last_flushed_sequence_ = current_sequence; } } if (updates == &tmp_batch_) tmp_batch_.Clear(); diff --git a/db/db_impl.h b/db/db_impl.h index 5654e17737..3b6be83734 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -73,7 +73,7 @@ class DBImpl : public DB { uint64_t* manifest_file_size, bool flush_memtable = true); virtual Status GetSortedWalFiles(VectorLogPtr& files); - virtual SequenceNumber GetLatestSequenceNumber(); + virtual SequenceNumber GetLatestSequenceNumber() const; virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter); virtual Status DeleteFile(std::string name); @@ -402,10 +402,6 @@ class DBImpl : public DB { // count of the number of contiguous delaying writes int delayed_writes_; - // store the last flushed sequence. - // Used by transaction log iterator. - SequenceNumber last_flushed_sequence_; - // The options to access storage files const EnvOptions storage_options_; diff --git a/db/db_test.cc b/db/db_test.cc index d17467e65b..5bcbde8039 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -660,7 +660,7 @@ class DBTest { } std::unique_ptr OpenTransactionLogIter( - const SequenceNumber seq) { + const SequenceNumber seq) { unique_ptr iter; Status status = dbfull()->GetUpdatesSince(seq, &iter); ASSERT_OK(status); @@ -3876,20 +3876,29 @@ TEST(DBTest, WALClear) { } while (ChangeCompactOptions()); } -void ExpectRecords( - const int expected_no_records, - std::unique_ptr& iter) { - int i = 0; +SequenceNumber ReadRecords( + std::unique_ptr& iter, + int& count) { + count = 0; SequenceNumber lastSequence = 0; + BatchResult res; while (iter->Valid()) { - BatchResult res = iter->GetBatch(); + res = iter->GetBatch(); ASSERT_TRUE(res.sequence > lastSequence); - ++i; + ++count; lastSequence = res.sequence; ASSERT_OK(iter->status()); iter->Next(); } - ASSERT_EQ(i, expected_no_records); + return res.sequence; +} + +void ExpectRecords( + const int expected_no_records, + std::unique_ptr& iter) { + int num_records; + ReadRecords(iter, num_records); + ASSERT_EQ(num_records, expected_no_records); } TEST(DBTest, TransactionLogIterator) { @@ -3976,6 +3985,35 @@ TEST(DBTest, TransactionLogIteratorCheckAfterRestart) { } while (ChangeCompactOptions()); } +TEST(DBTest, TransactionLogIteratorCorruptedLog) { + do { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + for (int i = 0; i < 1024; i++) { + Put("key"+std::to_string(i), DummyString(10)); + } + dbfull()->Flush(FlushOptions()); + // Corrupt this log to create a gap + rocksdb::VectorLogPtr wal_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); + const auto logfilePath = dbname_ + "/" + wal_files.front()->PathName(); + ASSERT_EQ( + 0, + truncate(logfilePath.c_str(), wal_files.front()->SizeFileBytes() / 2)); + // Insert a new entry to a new log file + Put("key1025", DummyString(10)); + // Try to read from the beginning. Should stop before the gap and read less + // than 1025 entries + auto iter = OpenTransactionLogIter(0); + int count; + int last_sequence_read = ReadRecords(iter, count); + ASSERT_LT(last_sequence_read, 1025); + // Try to read past the gap, should be able to seek to key1025 + auto iter2 = OpenTransactionLogIter(last_sequence_read + 1); + ExpectRecords(1, iter2); + } while (ChangeCompactOptions()); +} + TEST(DBTest, TransactionLogIteratorBatchOperations) { do { Options options = OptionsForLogIterTest(); @@ -4329,7 +4367,7 @@ class ModelDB: public DB { return Status::OK(); } - virtual SequenceNumber GetLatestSequenceNumber() { + virtual SequenceNumber GetLatestSequenceNumber() const { return 0; } virtual Status GetUpdatesSince(rocksdb::SequenceNumber, diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 29e309d5fb..0871b0eb62 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -14,7 +14,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const EnvOptions& soptions, const SequenceNumber seq, std::unique_ptr files, - SequenceNumber const * const lastFlushedSequence) : + DBImpl const * const dbimpl) : dir_(dir), options_(options), soptions_(soptions), @@ -24,10 +24,10 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( isValid_(false), currentFileIndex_(0), currentBatchSeq_(0), - currentBatchCount_(0), - lastFlushedSequence_(lastFlushedSequence) { - assert(startingSequenceNumber_ <= *lastFlushedSequence_); + currentLastSeq_(0), + dbimpl_(dbimpl) { assert(files_ != nullptr); + assert(dbimpl_ != nullptr); reporter_.env = options_->env; reporter_.info_log = options_->info_log.get(); @@ -77,7 +77,7 @@ bool TransactionLogIteratorImpl::RestrictedRead( Slice* record, std::string* scratch) { // Don't read if no more complete entries to read from logs - if (currentBatchSeq_ >= *lastFlushedSequence_) { + if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) { return false; } return currentLogReader_->ReadRecord(record, scratch); @@ -90,11 +90,6 @@ void TransactionLogIteratorImpl::SeekToStartSequence( Slice record; started_ = false; isValid_ = false; - if (startingSequenceNumber_ > *lastFlushedSequence_) { - currentStatus_ = Status::IOError("Looking for a sequence, " - "which is not flushed yet."); - return; - } if (files_->size() <= startFileIndex) { return; } @@ -110,8 +105,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence( continue; } UpdateCurrentWriteBatch(record); - if (currentBatchSeq_ + currentBatchCount_ - 1 >= - startingSequenceNumber_) { + if (currentLastSeq_ >= startingSequenceNumber_) { if (strict && currentBatchSeq_ != startingSequenceNumber_) { currentStatus_ = Status::Corruption("Gap in sequence number. Could not " "seek to required sequence number"); @@ -128,38 +122,57 @@ void TransactionLogIteratorImpl::SeekToStartSequence( isValid_ = false; } } + // Could not find start sequence in first file. Normally this must be the // only file. Otherwise log the error and let the iterator return next entry - if (files_->size() != 1) { + // If strict is set, we want to seek exactly till the start sequence and it + // should have been present in the file we scanned above + if (strict) { + currentStatus_ = Status::Corruption("Gap in sequence number. Could not " + "seek to required sequence number"); + reporter_.Info(currentStatus_.ToString().c_str()); + } else if (files_->size() != 1) { currentStatus_ = Status::Corruption("Start sequence was not found, " "skipping to the next available"); - reporter_.Corruption(0, currentStatus_); - started_ = true; // Let Next find next available entry - Next(); + reporter_.Info(currentStatus_.ToString().c_str()); + // Let NextImpl find the next available entry. started_ remains false + // because we don't want to check for gaps while moving to start sequence + NextImpl(true); } } void TransactionLogIteratorImpl::Next() { + return NextImpl(false); +} + +void TransactionLogIteratorImpl::NextImpl(bool internal) { std::string scratch; Slice record; isValid_ = false; - if (!started_) { // Runs every time until we can seek to the start sequence + if (!internal && !started_) { + // Runs every time until we can seek to the start sequence return SeekToStartSequence(); } while(true) { assert(currentLogReader_); - if (currentBatchSeq_ < *lastFlushedSequence_) { - if (currentLogReader_->IsEOF()) { - currentLogReader_->UnmarkEOF(); - } - while (currentLogReader_->ReadRecord(&record, &scratch)) { - if (record.size() < 12) { - reporter_.Corruption( - record.size(), Status::Corruption("very small log record")); - continue; - } else { - return UpdateCurrentWriteBatch(record); + if (currentLogReader_->IsEOF()) { + currentLogReader_->UnmarkEOF(); + } + while (RestrictedRead(&record, &scratch)) { + if (record.size() < 12) { + reporter_.Corruption( + record.size(), Status::Corruption("very small log record")); + continue; + } else { + // started_ should be true if called by application + assert(internal || started_); + // started_ should be false if called internally + assert(!internal || !started_); + UpdateCurrentWriteBatch(record); + if (internal && !started_) { + started_ = true; } + return; } } @@ -174,27 +187,27 @@ void TransactionLogIteratorImpl::Next() { } } else { isValid_ = false; - if (currentBatchSeq_ == *lastFlushedSequence_) { + if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { currentStatus_ = Status::OK(); } else { - currentStatus_ = Status::IOError(" NO MORE DATA LEFT"); + currentStatus_ = Status::IOError("NO MORE DATA LEFT"); } return; } } } -bool TransactionLogIteratorImpl::IsBatchContinuous( +bool TransactionLogIteratorImpl::IsBatchExpected( const WriteBatch* batch, const SequenceNumber expectedSeq) { assert(batch); SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch); - if (started_ && batchSeq != expectedSeq) { + if (batchSeq != expectedSeq) { char buf[200]; snprintf(buf, sizeof(buf), "Discontinuity in log records. Got seq=%lu, Expected seq=%lu, " - "Last flushed seq=%lu. Log iterator will seek the correct batch.", - batchSeq, expectedSeq, *lastFlushedSequence_); + "Last flushed seq=%lu.Log iterator will reseek the correct batch.", + batchSeq, expectedSeq, dbimpl_->GetLatestSequenceNumber()); reporter_.Info(buf); return false; } @@ -205,8 +218,9 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { WriteBatch* batch = new WriteBatch(); WriteBatchInternal::SetContents(batch, record); - SequenceNumber expectedSeq = currentBatchSeq_ + currentBatchCount_; - if (!IsBatchContinuous(batch, expectedSeq)) { + SequenceNumber expectedSeq = currentLastSeq_ + 1; + // If the iterator has started, then confirm that we get continuous batches + if (started_ && !IsBatchExpected(batch, expectedSeq)) { // Seek to the batch having expected sequence number if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) { // Expected batch must lie in the previous log file @@ -214,13 +228,15 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { currentFileIndex_ = (currentFileIndex_ >= 0) ? currentFileIndex_ : 0; } startingSequenceNumber_ = expectedSeq; + // currentStatus_ will be set to Ok if reseek succeeds + currentStatus_ = Status::NotFound("Gap in sequence numbers"); return SeekToStartSequence(currentFileIndex_, true); } currentBatchSeq_ = WriteBatchInternal::Sequence(batch); - currentBatchCount_ = WriteBatchInternal::Count(batch); + currentLastSeq_ = currentBatchSeq_ + WriteBatchInternal::Count(batch) - 1; // currentBatchSeq_ can only change here - assert(currentBatchSeq_ <= *lastFlushedSequence_); + assert(currentLastSeq_ <= dbimpl_->GetLatestSequenceNumber()); currentBatch_.reset(batch); isValid_ = true; diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index d08d192aca..f3f4ce224f 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -10,6 +10,7 @@ #include "rocksdb/options.h" #include "rocksdb/types.h" #include "rocksdb/transaction_log.h" +#include "db/db_impl.h" #include "db/log_reader.h" #include "db/filename.h" @@ -70,7 +71,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const EnvOptions& soptions, const SequenceNumber seqNum, std::unique_ptr files, - SequenceNumber const * const lastFlushedSequence); + DBImpl const * const dbimpl); virtual bool Valid(); @@ -95,16 +96,21 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { Status OpenLogFile(const LogFile* logFile, unique_ptr* file); LogReporter reporter_; SequenceNumber currentBatchSeq_; // sequence number at start of current batch - uint64_t currentBatchCount_; // count in current batch - SequenceNumber const * const lastFlushedSequence_; + SequenceNumber currentLastSeq_; // last sequence in the current batch + DBImpl const * const dbimpl_; // The db on whose log files this iterates // Reads from transaction log only if the writebatch record has been written bool RestrictedRead(Slice* record, std::string* scratch); // Seeks to startingSequenceNumber reading from startFileIndex in files_. // If strict is set,then must get a batch starting with startingSequenceNumber void SeekToStartSequence(uint64_t startFileIndex = 0, bool strict = false); - // Check if batch is continuous starting from expectedSeq, else return false - bool IsBatchContinuous(const WriteBatch* batch, SequenceNumber expectedSeq); + // Implementation of Next. SeekToStartSequence calls it internally with + // internal=true to let it find next entry even if it has to jump gaps because + // the iterator may start off from the first available entry but promises to + // be continuous after that + void NextImpl(bool internal = false); + // Check if batch is expected, else return false + bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expectedSeq); // Update current batch if a continuous batch is found, else return false void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 06e6b9bb3e..25614a91c1 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -258,19 +258,15 @@ class DB { virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0; // The sequence number of the most recent transaction. - virtual SequenceNumber GetLatestSequenceNumber() = 0; + virtual SequenceNumber GetLatestSequenceNumber() const = 0; - // Return's an iterator for all writes since the sequence number - // Status::ok if iterator is valid. - // The iterator internally holds references to the available log files. - // It automatically takes care of closing a file with no-updates left, and - // opening the next one. - // If the sequence number is non existent. it returns an iterator at a seq_no - // just greater than the requested seq_no. - // Must set WAL_ttl_seconds to a large value to use this api. - // else the WAL files will get - // cleared aggressively and the iterator might keep getting invalid before - // an update is read. + // Sets iter to an iterator that is positioned at a write-batch containing + // seq_number. If the sequence number is non existent, it returns an iterator + // at the first available seq_no after the requested seq_no + // Returns Status::Ok if iterator is valid + // Must set WAL_ttl_seconds to a large value to use this api, else the WAL + // files will get cleared aggressively and the iterator might keep getting + // invalid before an update is read. virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter) = 0; diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index 311b35176c..e74980af60 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -54,7 +54,9 @@ struct BatchResult { std::unique_ptr writeBatchPtr; }; -// A TransactionLogIterator is used to iterate over the Transaction's in a db. +// A TransactionLogIterator is used to iterate over the transactions in a db. +// One run of the iterator is continuous, i.e. the iterator will stop at the +// beginning of any gap in sequences class TransactionLogIterator { public: TransactionLogIterator() {} @@ -74,7 +76,7 @@ class TransactionLogIterator { virtual Status status() = 0; // If valid return's the current write_batch and the sequence number of the - // latest transaction contained in the batch. + // earliest transaction contained in the batch. // ONLY use if Valid() is true and status() is OK. virtual BatchResult GetBatch() = 0; }; diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index d867f2f95a..f15a22e12d 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -136,7 +136,7 @@ class StackableDB : public DB { return sdb_->GetLiveFiles(vec, mfs, flush_memtable); } - virtual SequenceNumber GetLatestSequenceNumber() override { + virtual SequenceNumber GetLatestSequenceNumber() const override { return sdb_->GetLatestSequenceNumber(); } diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index cac1d01466..e314c3ad4f 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -272,7 +272,7 @@ Status DBWithTTL::GetLiveFiles(std::vector& vec, uint64_t* mfs, return db_->GetLiveFiles(vec, mfs, flush_memtable); } -SequenceNumber DBWithTTL::GetLatestSequenceNumber() { +SequenceNumber DBWithTTL::GetLatestSequenceNumber() const { return db_->GetLatestSequenceNumber(); } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 199f7553b1..88ffbe60ec 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -80,7 +80,7 @@ class DBWithTTL : public StackableDB { virtual Status DeleteFile(std::string name); - virtual SequenceNumber GetLatestSequenceNumber(); + virtual SequenceNumber GetLatestSequenceNumber() const; virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter);