[rocksdb] 2PC double recovery bug fix

Summary:
1. prepare()
2. crash
3. recover
4. commit()
5. crash
6. data is lost

This is due to the transaction data still only residing in the WAL but because the logs were flushed on the first recovery the data is ignored on the second recovery. We must scan all logs found on recovery and only ignore redundant data at the time of replay. It is not possible to know which logs still contain relevant data at time of recovery. We cannot simply ignore a log because all of the non-2pc data it contains has already been written to L0.

The changes made to MemTableInserter are to ensure that prepared sections are still recovered even if all of the non-2pc data in that log has already been flushed to L0.

Test Plan: Provided test.

Reviewers: sdong

Subscribers: andrewkr, hermanlee4, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D57729
This commit is contained in:
Reid Horuff 2016-05-05 12:48:52 -07:00
parent a657ee9a9c
commit c27061dae7
4 changed files with 90 additions and 22 deletions

View File

@ -1195,8 +1195,6 @@ Status DBImpl::Recover(
// Note that prev_log_number() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of rocksdb.
const uint64_t min_log = versions_->MinLogNumber();
const uint64_t prev_log = versions_->prev_log_number();
std::vector<std::string> filenames;
s = env_->GetChildren(db_options_.wal_dir, &filenames);
if (!s.ok()) {
@ -1213,7 +1211,7 @@ Status DBImpl::Recover(
"While creating a new Db, wal_dir contains "
"existing log file: ",
filenames[i]);
} else if ((number >= min_log) || (number == prev_log)) {
} else {
logs.push_back(number);
}
}

View File

@ -761,17 +761,17 @@ class MemTableInserter : public WriteBatch::Handler {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->Put(cf_mems_->GetColumnFamilyHandle(), key, value);
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
@ -851,48 +851,50 @@ class MemTableInserter : public WriteBatch::Handler {
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->Delete(cf_mems_->GetColumnFamilyHandle(), key);
return Status::OK();
}
return DeleteImpl(column_family_id, key, kTypeDeletion);
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->SingleDelete(cf_mems_->GetColumnFamilyHandle(), key);
return Status::OK();
}
return DeleteImpl(column_family_id, key, kTypeSingleDeletion);
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
assert(!concurrent_memtable_writes_);
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->Merge(cf_mems_->GetColumnFamilyHandle(), key, value);
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
bool perform_merge = false;

View File

@ -139,6 +139,7 @@ Status TransactionDB::Open(
assert(recovered_trx->name_.length());
WriteOptions w_options;
w_options.sync = true;
TransactionOptions t_options;
Transaction* real_trx =

View File

@ -682,6 +682,73 @@ TEST_F(TransactionTest, TwoPhaseSequenceTest) {
ASSERT_EQ(value, "bar4");
}
TEST_F(TransactionTest, TwoPhaseDoubleRecoveryTest) {
WriteOptions write_options;
write_options.sync = true;
write_options.disableWAL = false;
ReadOptions read_options;
TransactionOptions txn_options;
std::string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("a");
ASSERT_OK(s);
// transaction put
s = txn->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(s);
// prepare
s = txn->Prepare();
ASSERT_OK(s);
delete txn;
// kill and reopen
env_->SetFilesystemActive(false);
ReOpenNoDelete();
// commit old txn
txn = db->GetTransactionByName("a");
s = txn->Commit();
ASSERT_OK(s);
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(value, "bar");
delete txn;
txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("b");
ASSERT_OK(s);
s = txn->Put(Slice("foo2"), Slice("bar2"));
ASSERT_OK(s);
s = txn->Prepare();
ASSERT_OK(s);
s = txn->Commit();
ASSERT_OK(s);
// kill and reopen
env_->SetFilesystemActive(false);
ReOpenNoDelete();
// value is now available
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(value, "bar");
s = db->Get(read_options, "foo2", &value);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(value, "bar2");
}
TEST_F(TransactionTest, TwoPhaseLogRollingTest) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());