WritePrepared Txn: Test sequence number 0 is visible

Summary:
Compaction will output keys with sequence number 0, if it is visible to
earliest snapshot. Adding a test to make sure IsInSnapshot() report sequence number 0 is
visible to any snapshot.
Closes https://github.com/facebook/rocksdb/pull/2974

Differential Revision: D5990665

Pulled By: yiwu-arbug

fbshipit-source-id: ef50ebc777ff8ca688771f3ab598c7a609b0b65e
This commit is contained in:
Yi Wu 2017-10-05 16:25:58 -07:00 committed by Facebook Github Bot
parent 5a6ad9d52a
commit cc20ec3689
2 changed files with 64 additions and 0 deletions

View file

@ -589,6 +589,11 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
// TODO(myabandeh): read your own writes
// TODO(myabandeh): optimize this. This sequence of checks must be correct but
// not necessary efficient
if (prep_seq == 0) {
// Compaction will output keys to bottom-level with sequence number 0 if
// it is visible to the earliest snapshot.
return true;
}
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
return false;

View file

@ -20,6 +20,7 @@
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
@ -283,6 +284,45 @@ class WritePreparedTransactionTest : public TransactionTest {
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// Verify value of keys.
void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
const Snapshot* snapshot = nullptr) {
std::string value;
ReadOptions read_options;
read_options.snapshot = snapshot;
for (auto& kv : data) {
auto s = db->Get(read_options, kv.first, &value);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
if (kv.second != value) {
printf("key = %s\n", kv.first.c_str());
}
ASSERT_EQ(kv.second, value);
} else {
ASSERT_EQ(kv.second, "NOT_FOUND");
}
}
}
// Verify all versions of keys.
void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
std::vector<KeyVersion> versions;
ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
expected_versions.back().user_key, &versions));
ASSERT_EQ(expected_versions.size(), versions.size());
for (size_t i = 0; i < versions.size(); i++) {
ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
ASSERT_EQ(expected_versions[i].type, versions[i].type);
if (versions[i].type != kTypeDeletion &&
versions[i].type != kTypeSingleDeletion) {
ASSERT_EQ(expected_versions[i].value, versions[i].value);
}
// Range delete not supported.
assert(expected_versions[i].type != kTypeRangeDeletion);
}
}
};
// TODO(myabandeh): enable it for concurrent_prepare
@ -1304,6 +1344,25 @@ TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) {
}
}
TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) {
ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
VerifyKeys({{"foo", "bar"}});
const Snapshot* snapshot = db->GetSnapshot();
ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Compaction will output keys with sequence number 0, if it is visible to
// earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
// visible to any snapshot.
VerifyKeys({{"foo", "bar"}});
VerifyKeys({{"foo", "bar"}}, snapshot);
VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
db->ReleaseSnapshot(snapshot);
}
} // namespace rocksdb
int main(int argc, char** argv) {