mirror of https://github.com/facebook/rocksdb.git
Fix bad merge of D16791 and D16767
Summary: A bad Auto-Merge caused log buffer is flushed twice. Remove the unintended one. Test Plan: Should already be tested (the code looks the same as when I ran unit tests). Reviewers: haobo, igor Reviewed By: haobo CC: ljin, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D16821
This commit is contained in:
parent
86ba3e24e3
commit
839c8ecfcd
|
@ -4543,16 +4543,43 @@ TEST(DBTest, TransactionLogIterator) {
|
||||||
{
|
{
|
||||||
auto iter = OpenTransactionLogIter(0);
|
auto iter = OpenTransactionLogIter(0);
|
||||||
ExpectRecords(3, iter);
|
ExpectRecords(3, iter);
|
||||||
}
|
assert(!iter->IsObsolete());
|
||||||
|
iter->Next();
|
||||||
|
assert(!iter->Valid());
|
||||||
|
assert(!iter->IsObsolete());
|
||||||
|
assert(iter->status().ok());
|
||||||
|
|
||||||
Reopen(&options);
|
Reopen(&options);
|
||||||
env_->SleepForMicroseconds(2 * 1000 * 1000);{
|
env_->SleepForMicroseconds(2 * 1000 * 1000);
|
||||||
Put("key4", DummyString(1024));
|
Put("key4", DummyString(1024));
|
||||||
Put("key5", DummyString(1024));
|
Put("key5", DummyString(1024));
|
||||||
Put("key6", DummyString(1024));
|
Put("key6", DummyString(1024));
|
||||||
|
|
||||||
|
iter->Next();
|
||||||
|
assert(!iter->Valid());
|
||||||
|
assert(iter->IsObsolete());
|
||||||
|
assert(iter->status().ok());
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
auto iter = OpenTransactionLogIter(0);
|
auto iter = OpenTransactionLogIter(0);
|
||||||
ExpectRecords(6, iter);
|
ExpectRecords(6, iter);
|
||||||
|
assert(!iter->IsObsolete());
|
||||||
|
iter->Next();
|
||||||
|
assert(!iter->Valid());
|
||||||
|
assert(!iter->IsObsolete());
|
||||||
|
assert(iter->status().ok());
|
||||||
|
|
||||||
|
Put("key7", DummyString(1024));
|
||||||
|
iter->Next();
|
||||||
|
assert(iter->Valid());
|
||||||
|
assert(iter->status().ok());
|
||||||
|
|
||||||
|
dbfull()->Flush(FlushOptions());
|
||||||
|
Put("key8", DummyString(1024));
|
||||||
|
iter->Next();
|
||||||
|
assert(!iter->Valid());
|
||||||
|
assert(iter->IsObsolete());
|
||||||
|
assert(iter->status().ok());
|
||||||
}
|
}
|
||||||
} while (ChangeCompactOptions());
|
} while (ChangeCompactOptions());
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
|
||||||
files_(std::move(files)),
|
files_(std::move(files)),
|
||||||
started_(false),
|
started_(false),
|
||||||
isValid_(false),
|
isValid_(false),
|
||||||
|
is_obsolete_(false),
|
||||||
currentFileIndex_(0),
|
currentFileIndex_(0),
|
||||||
currentBatchSeq_(0),
|
currentBatchSeq_(0),
|
||||||
currentLastSeq_(0),
|
currentLastSeq_(0),
|
||||||
|
@ -69,14 +70,15 @@ bool TransactionLogIteratorImpl::Valid() {
|
||||||
return started_ && isValid_;
|
return started_ && isValid_;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TransactionLogIteratorImpl::RestrictedRead(
|
bool TransactionLogIteratorImpl::RestrictedRead(Slice* record,
|
||||||
Slice* record,
|
|
||||||
std::string* scratch) {
|
std::string* scratch) {
|
||||||
// Don't read if no more complete entries to read from logs
|
bool ret = currentLogReader_->ReadRecord(record, scratch);
|
||||||
if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) {
|
|
||||||
return false;
|
if (!reporter_.last_status.ok()) {
|
||||||
|
currentStatus_ = reporter_.last_status;
|
||||||
}
|
}
|
||||||
return currentLogReader_->ReadRecord(record, scratch);
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TransactionLogIteratorImpl::SeekToStartSequence(
|
void TransactionLogIteratorImpl::SeekToStartSequence(
|
||||||
|
@ -86,6 +88,7 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
|
||||||
Slice record;
|
Slice record;
|
||||||
started_ = false;
|
started_ = false;
|
||||||
isValid_ = false;
|
isValid_ = false;
|
||||||
|
is_obsolete_ = false;
|
||||||
if (files_->size() <= startFileIndex) {
|
if (files_->size() <= startFileIndex) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -94,6 +97,18 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
|
||||||
currentStatus_ = s;
|
currentStatus_ = s;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
auto latest_seq_num = dbimpl_->GetLatestSequenceNumber();
|
||||||
|
if (startingSequenceNumber_ > latest_seq_num) {
|
||||||
|
if (strict) {
|
||||||
|
currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
|
||||||
|
"seek to required sequence number");
|
||||||
|
reporter_.Info(currentStatus_.ToString().c_str());
|
||||||
|
} else {
|
||||||
|
// isValid_ is false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while (RestrictedRead(&record, &scratch)) {
|
while (RestrictedRead(&record, &scratch)) {
|
||||||
if (record.size() < 12) {
|
if (record.size() < 12) {
|
||||||
reporter_.Corruption(
|
reporter_.Corruption(
|
||||||
|
@ -123,11 +138,11 @@ void TransactionLogIteratorImpl::SeekToStartSequence(
|
||||||
// only file. Otherwise log the error and let the iterator return next entry
|
// only file. Otherwise log the error and let the iterator return next entry
|
||||||
// If strict is set, we want to seek exactly till the start sequence and it
|
// If strict is set, we want to seek exactly till the start sequence and it
|
||||||
// should have been present in the file we scanned above
|
// should have been present in the file we scanned above
|
||||||
if (strict) {
|
if (strict || files_->size() == 1) {
|
||||||
currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
|
currentStatus_ = Status::Corruption("Gap in sequence number. Could not "
|
||||||
"seek to required sequence number");
|
"seek to required sequence number");
|
||||||
reporter_.Info(currentStatus_.ToString().c_str());
|
reporter_.Info(currentStatus_.ToString().c_str());
|
||||||
} else if (files_->size() != 1) {
|
} else {
|
||||||
currentStatus_ = Status::Corruption("Start sequence was not found, "
|
currentStatus_ = Status::Corruption("Start sequence was not found, "
|
||||||
"skipping to the next available");
|
"skipping to the next available");
|
||||||
reporter_.Info(currentStatus_.ToString().c_str());
|
reporter_.Info(currentStatus_.ToString().c_str());
|
||||||
|
@ -149,11 +164,30 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
|
||||||
// Runs every time until we can seek to the start sequence
|
// Runs every time until we can seek to the start sequence
|
||||||
return SeekToStartSequence();
|
return SeekToStartSequence();
|
||||||
}
|
}
|
||||||
while(true) {
|
|
||||||
|
is_obsolete_ = false;
|
||||||
|
auto latest_seq_num = dbimpl_->GetLatestSequenceNumber();
|
||||||
|
if (currentLastSeq_ >= latest_seq_num) {
|
||||||
|
isValid_ = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool first = true;
|
||||||
|
while (currentFileIndex_ < files_->size()) {
|
||||||
|
if (!first) {
|
||||||
|
Status status =OpenLogReader(files_->at(currentFileIndex_).get());
|
||||||
|
if (!status.ok()) {
|
||||||
|
isValid_ = false;
|
||||||
|
currentStatus_ = status;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
first = false;
|
||||||
assert(currentLogReader_);
|
assert(currentLogReader_);
|
||||||
if (currentLogReader_->IsEOF()) {
|
if (currentLogReader_->IsEOF()) {
|
||||||
currentLogReader_->UnmarkEOF();
|
currentLogReader_->UnmarkEOF();
|
||||||
}
|
}
|
||||||
|
|
||||||
while (RestrictedRead(&record, &scratch)) {
|
while (RestrictedRead(&record, &scratch)) {
|
||||||
if (record.size() < 12) {
|
if (record.size() < 12) {
|
||||||
reporter_.Corruption(
|
reporter_.Corruption(
|
||||||
|
@ -171,26 +205,14 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open the next file
|
// Open the next file
|
||||||
if (currentFileIndex_ < files_->size() - 1) {
|
|
||||||
++currentFileIndex_;
|
++currentFileIndex_;
|
||||||
Status status =OpenLogReader(files_->at(currentFileIndex_).get());
|
}
|
||||||
if (!status.ok()) {
|
|
||||||
|
// Read all the files but cannot find next record expected.
|
||||||
|
// TODO(sdong): support to auto fetch new log files from DB and continue.
|
||||||
isValid_ = false;
|
isValid_ = false;
|
||||||
currentStatus_ = status;
|
is_obsolete_ = true;
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
isValid_ = false;
|
|
||||||
if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) {
|
|
||||||
currentStatus_ = Status::OK();
|
|
||||||
} else {
|
|
||||||
currentStatus_ = Status::Corruption("NO MORE DATA LEFT");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TransactionLogIteratorImpl::IsBatchExpected(
|
bool TransactionLogIteratorImpl::IsBatchExpected(
|
||||||
|
|
|
@ -19,7 +19,9 @@ namespace rocksdb {
|
||||||
struct LogReporter : public log::Reader::Reporter {
|
struct LogReporter : public log::Reader::Reporter {
|
||||||
Env* env;
|
Env* env;
|
||||||
Logger* info_log;
|
Logger* info_log;
|
||||||
|
Status last_status;
|
||||||
virtual void Corruption(size_t bytes, const Status& s) {
|
virtual void Corruption(size_t bytes, const Status& s) {
|
||||||
|
last_status = s;
|
||||||
Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
|
Log(info_log, "dropping %zu bytes; %s", bytes, s.ToString().c_str());
|
||||||
}
|
}
|
||||||
virtual void Info(const char* s) {
|
virtual void Info(const char* s) {
|
||||||
|
@ -74,6 +76,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
|
||||||
|
|
||||||
virtual bool Valid();
|
virtual bool Valid();
|
||||||
|
|
||||||
|
virtual bool IsObsolete() override { return is_obsolete_; }
|
||||||
|
|
||||||
virtual void Next();
|
virtual void Next();
|
||||||
|
|
||||||
virtual Status status();
|
virtual Status status();
|
||||||
|
@ -89,6 +93,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
|
||||||
std::unique_ptr<VectorLogPtr> files_;
|
std::unique_ptr<VectorLogPtr> files_;
|
||||||
bool started_;
|
bool started_;
|
||||||
bool isValid_; // not valid when it starts of.
|
bool isValid_; // not valid when it starts of.
|
||||||
|
bool is_obsolete_;
|
||||||
Status currentStatus_;
|
Status currentStatus_;
|
||||||
size_t currentFileIndex_;
|
size_t currentFileIndex_;
|
||||||
std::unique_ptr<WriteBatch> currentBatch_;
|
std::unique_ptr<WriteBatch> currentBatch_;
|
||||||
|
|
|
@ -73,6 +73,12 @@ class TransactionLogIterator {
|
||||||
// Can read data from a valid iterator.
|
// Can read data from a valid iterator.
|
||||||
virtual bool Valid() = 0;
|
virtual bool Valid() = 0;
|
||||||
|
|
||||||
|
// IsObsolete() returns true if new log files were created. This usually
|
||||||
|
// means that the user needs to close the current iterator and create a new
|
||||||
|
// one to get the newest updates. It should happen only when mem tables are
|
||||||
|
// flushed.
|
||||||
|
virtual bool IsObsolete() = 0;
|
||||||
|
|
||||||
// Moves the iterator to the next WriteBatch.
|
// Moves the iterator to the next WriteBatch.
|
||||||
// REQUIRES: Valid() to be true.
|
// REQUIRES: Valid() to be true.
|
||||||
virtual void Next() = 0;
|
virtual void Next() = 0;
|
||||||
|
|
|
@ -67,8 +67,21 @@ static void ReplicationThreadBody(void* arg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fprintf(stderr, "Refreshing iterator\n");
|
fprintf(stderr, "Refreshing iterator\n");
|
||||||
for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
|
for (; !iter->IsObsolete(); iter->Next()) {
|
||||||
|
if (!iter->Valid()) {
|
||||||
|
if (t->stop.Acquire_Load() == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// need to wait for new rows.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
BatchResult res = iter->GetBatch();
|
BatchResult res = iter->GetBatch();
|
||||||
|
if (!iter->status().ok()) {
|
||||||
|
fprintf(stderr, "Corruption reported when reading seq no. b/w %ld",
|
||||||
|
static_cast<uint64_t>(currentSeqNum));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
if (res.sequence != currentSeqNum) {
|
if (res.sequence != currentSeqNum) {
|
||||||
fprintf(stderr,
|
fprintf(stderr,
|
||||||
"Missed a seq no. b/w %ld and %ld\n",
|
"Missed a seq no. b/w %ld and %ld\n",
|
||||||
|
@ -76,6 +89,8 @@ static void ReplicationThreadBody(void* arg) {
|
||||||
(long)res.sequence);
|
(long)res.sequence);
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
t->no_read++;
|
||||||
|
currentSeqNum++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue