Fix MultiGet dropping memtable kv checksum corruption (#12842)

Summary:
Corruption status returned by `GetFromTable()` could be overwritten here: b6c3495a71/db/version_set.cc (L2614)

This PR fixes this issue by setting `*(s->found_final_value) = true;` in SaveValue. Also makes the handling of the return value of `GetFromTable()` more robust and added asserts in a couple places.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12842

Test Plan: Updated an existing unit test to cover MultiGet. It fails the assertion here before this PR: b6c3495a71/db/version_set.cc (L2601)

Reviewed By: anand1976

Differential Revision: D59498203

Pulled By: cbi42

fbshipit-source-id: 1f071c1b2c5b66fb71264b547a9e670d1cf592f0
This commit is contained in:
Changyu Bi 2024-08-08 13:34:11 -07:00 committed by Facebook GitHub Bot
parent d33d25f903
commit b32d899482
7 changed files with 62 additions and 10 deletions

View File

@ -2476,7 +2476,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} }
} }
if (!done && !s.ok() && !s.IsMergeInProgress()) { if (!s.ok() && !s.IsMergeInProgress() && !s.IsNotFound()) {
assert(done);
ReturnAndCleanupSuperVersion(cfd, sv); ReturnAndCleanupSuperVersion(cfd, sv);
return s; return s;
} }

View File

@ -684,13 +684,14 @@ class DbMemtableKVChecksumTest : public DbKvChecksumTest {
DbMemtableKVChecksumTest() : DbKvChecksumTest() {} DbMemtableKVChecksumTest() : DbKvChecksumTest() {}
protected: protected:
const size_t kValueLenOffset = 12;
// Indices in the memtable entry that we will not corrupt. // Indices in the memtable entry that we will not corrupt.
// For memtable entry format, see comments in MemTable::Add(). // For memtable entry format, see comments in MemTable::Add().
// We do not corrupt key length and value length fields in this test // We do not corrupt key length and value length fields in this test
// case since it causes segfault and ASAN will complain. // case since it causes segfault and ASAN will complain.
// For this test case, key and value are all of length 3, so // For this test case, key and value are all of length 3, so
// key length field is at index 0 and value length field is at index 12. // key length field is at index 0 and value length field is at index 12.
const std::set<size_t> index_not_to_corrupt{0, 12}; const std::set<size_t> index_not_to_corrupt{0, kValueLenOffset};
void SkipNotToCorruptEntry() { void SkipNotToCorruptEntry() {
if (index_not_to_corrupt.find(corrupt_byte_offset_) != if (index_not_to_corrupt.find(corrupt_byte_offset_) !=
@ -737,6 +738,8 @@ TEST_P(DbMemtableKVChecksumTest, GetWithCorruptAfterMemtableInsert) {
buf[corrupt_byte_offset_] += corrupt_byte_addend_; buf[corrupt_byte_offset_] += corrupt_byte_addend_;
++corrupt_byte_offset_; ++corrupt_byte_offset_;
}); });
// Corrupt value only so that MultiGet below can find the key.
corrupt_byte_offset_ = kValueLenOffset + 1;
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions(); Options options = CurrentOptions();
options.memtable_protection_bytes_per_key = options.memtable_protection_bytes_per_key =
@ -745,12 +748,17 @@ TEST_P(DbMemtableKVChecksumTest, GetWithCorruptAfterMemtableInsert) {
options.merge_operator = MergeOperators::CreateStringAppendOperator(); options.merge_operator = MergeOperators::CreateStringAppendOperator();
} }
std::string key = "key";
SkipNotToCorruptEntry(); SkipNotToCorruptEntry();
while (MoreBytesToCorrupt()) { while (MoreBytesToCorrupt()) {
Reopen(options); Reopen(options);
ASSERT_OK(ExecuteWrite(nullptr)); ASSERT_OK(ExecuteWrite(nullptr));
std::string val; std::string val;
ASSERT_TRUE(db_->Get(ReadOptions(), "key", &val).IsCorruption()); ASSERT_TRUE(db_->Get(ReadOptions(), key, &val).IsCorruption());
std::vector<std::string> vals = {val};
std::vector<Status> statuses = db_->MultiGet(
ReadOptions(), {db_->DefaultColumnFamily()}, {key}, &vals, nullptr);
ASSERT_TRUE(statuses[0].IsCorruption());
Destroy(options); Destroy(options);
SkipNotToCorruptEntry(); SkipNotToCorruptEntry();
} }

View File

@ -933,6 +933,8 @@ static bool SaveValue(void* arg, const char* entry) {
Saver* s = static_cast<Saver*>(arg); Saver* s = static_cast<Saver*>(arg);
assert(s != nullptr); assert(s != nullptr);
assert(!s->value || !s->columns); assert(!s->value || !s->columns);
assert(!*(s->found_final_value));
assert(s->status->ok() || s->status->IsMergeInProgress());
MergeContext* merge_context = s->merge_context; MergeContext* merge_context = s->merge_context;
SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq; SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
@ -966,6 +968,7 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MemTable::VerifyEntryChecksum( *(s->status) = MemTable::VerifyEntryChecksum(
entry, s->protection_bytes_per_key, s->allow_data_in_errors); entry, s->protection_bytes_per_key, s->allow_data_in_errors);
if (!s->status->ok()) { if (!s->status->ok()) {
*(s->found_final_value) = true;
ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState()); ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
// Memtable entry corrupted // Memtable entry corrupted
return false; return false;
@ -1231,6 +1234,7 @@ static bool SaveValue(void* arg, const char* entry) {
". "); ". ");
msg.append("seq: " + std::to_string(seq) + "."); msg.append("seq: " + std::to_string(seq) + ".");
} }
*(s->found_final_value) = true;
*(s->status) = Status::Corruption(msg.c_str()); *(s->status) = Status::Corruption(msg.c_str());
return false; return false;
} }
@ -1310,8 +1314,12 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
// No change to value, since we have not yet found a Put/Delete // No change to value, since we have not yet found a Put/Delete
// Propagate corruption error // Propagate corruption error
if (!found_final_value && merge_in_progress && !s->IsCorruption()) { if (!found_final_value && merge_in_progress) {
*s = Status::MergeInProgress(); if (s->ok()) {
*s = Status::MergeInProgress();
} else {
assert(s->IsMergeInProgress());
}
} }
PERF_COUNTER_ADD(get_from_memtable_count, 1); PERF_COUNTER_ADD(get_from_memtable_count, 1);
return found_final_value; return found_final_value;
@ -1348,6 +1356,7 @@ void MemTable::GetFromTable(const LookupKey& key,
saver.allow_data_in_errors = moptions_.allow_data_in_errors; saver.allow_data_in_errors = moptions_.allow_data_in_errors;
saver.protection_bytes_per_key = moptions_.protection_bytes_per_key; saver.protection_bytes_per_key = moptions_.protection_bytes_per_key;
table_->Get(key, &saver, SaveValue); table_->Get(key, &saver, SaveValue);
assert(s->ok() || s->IsMergeInProgress() || *found_final_value);
*seq = saver.seq; *seq = saver.seq;
} }
@ -1421,10 +1430,19 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
&found_final_value, &merge_in_progress); &found_final_value, &merge_in_progress);
if (!found_final_value && merge_in_progress) { if (!found_final_value && merge_in_progress) {
*(iter->s) = Status::MergeInProgress(); if (iter->s->ok()) {
*(iter->s) = Status::MergeInProgress();
} else {
assert(iter->s->IsMergeInProgress());
}
} }
if (found_final_value) { if (found_final_value ||
(!iter->s->ok() && !iter->s->IsMergeInProgress())) {
// `found_final_value` should be set if an error/corruption occurs.
// The check on iter->s is just there in case GetFromTable() did not
// set `found_final_value` properly.
assert(found_final_value);
if (iter->value) { if (iter->value) {
iter->value->PinSelf(); iter->value->PinSelf();
range->AddValueSize(iter->value->size()); range->AddValueSize(iter->value->size());

View File

@ -249,12 +249,14 @@ class MemTable {
// If do_merge = true the default behavior which is Get value for key is // If do_merge = true the default behavior which is Get value for key is
// executed. Expected behavior is described right below. // executed. Expected behavior is described right below.
// If memtable contains a value for key, store it in *value and return true. // If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error // If memtable contains a deletion for key, store NotFound() in *status and
// in *status and return true. // return true.
// If memtable contains Merge operation as the most recent entry for a key, // If memtable contains Merge operation as the most recent entry for a key,
// and the merge process does not stop (not reaching a value or delete), // and the merge process does not stop (not reaching a value or delete),
// prepend the current merge operand to *operands. // prepend the current merge operand to *operands.
// store MergeInProgress in s, and return false. // store MergeInProgress in s, and return false.
// If an unexpected error or corruption occurs, store Corruption() or other
// error in *status and return true.
// Else, return false. // Else, return false.
// If any operation was found, its most recent sequence number // If any operation was found, its most recent sequence number
// will be stored in *seq on success (regardless of whether true/false is // will be stored in *seq on success (regardless of whether true/false is

View File

@ -181,7 +181,8 @@ bool MemTableListVersion::GetFromList(
} }
if (done) { if (done) {
assert(*seq != kMaxSequenceNumber || s->IsNotFound()); assert(*seq != kMaxSequenceNumber ||
(!s->ok() && !s->IsMergeInProgress()));
return true; return true;
} }
if (!s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) { if (!s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {

View File

@ -287,6 +287,7 @@ TEST_F(MemTableListTest, GetTest) {
// Fetch the newly written keys // Fetch the newly written keys
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr, found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
/*timestamp*/ nullptr, &s, &merge_context, /*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(), &max_covering_tombstone_seq, ReadOptions(),
@ -295,6 +296,7 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_EQ(value, "value1"); ASSERT_EQ(value, "value1");
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key1", 2), &value, /*columns*/ nullptr, found = mem->Get(LookupKey("key1", 2), &value, /*columns*/ nullptr,
/*timestamp*/ nullptr, &s, &merge_context, /*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(), &max_covering_tombstone_seq, ReadOptions(),
@ -303,6 +305,7 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr, found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
/*timestamp*/ nullptr, &s, &merge_context, /*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(), &max_covering_tombstone_seq, ReadOptions(),
@ -311,6 +314,7 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_EQ(value, "value2.2"); ASSERT_EQ(value, "value2.2");
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key3", seq), &value, /*columns*/ nullptr, found = mem->Get(LookupKey("key3", seq), &value, /*columns*/ nullptr,
/*timestamp*/ nullptr, &s, &merge_context, /*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(), &max_covering_tombstone_seq, ReadOptions(),
@ -350,6 +354,7 @@ TEST_F(MemTableListTest, GetTest) {
// Fetch keys via MemTableList // Fetch keys via MemTableList
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -357,6 +362,7 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = list.current()->Get(LookupKey("key1", saved_seq), &value, found = list.current()->Get(LookupKey("key1", saved_seq), &value,
/*columns=*/nullptr, /*timestamp=*/nullptr, &s, /*columns=*/nullptr, /*timestamp=*/nullptr, &s,
&merge_context, &max_covering_tombstone_seq, &merge_context, &max_covering_tombstone_seq,
@ -365,6 +371,7 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_EQ("value1", value); ASSERT_EQ("value1", value);
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -373,12 +380,14 @@ TEST_F(MemTableListTest, GetTest) {
ASSERT_EQ(value, "value2.3"); ASSERT_EQ(value, "value2.3");
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = list.current()->Get(LookupKey("key2", 1), &value, /*columns=*/nullptr, found = list.current()->Get(LookupKey("key2", 1), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions()); &max_covering_tombstone_seq, ReadOptions());
ASSERT_FALSE(found); ASSERT_FALSE(found);
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -438,6 +447,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Fetch the newly written keys // Fetch the newly written keys
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr, found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
/*timestamp*/ nullptr, &s, &merge_context, /*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(), &max_covering_tombstone_seq, ReadOptions(),
@ -446,6 +456,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr, found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
/*timestamp*/ nullptr, &s, &merge_context, /*timestamp*/ nullptr, &s, &merge_context,
&max_covering_tombstone_seq, ReadOptions(), &max_covering_tombstone_seq, ReadOptions(),
@ -462,6 +473,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Fetch keys via MemTableList // Fetch keys via MemTableList
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -469,6 +481,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -508,6 +521,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify keys are present in history // Verify keys are present in history
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key1", seq), &value, /*columns=*/nullptr, LookupKey("key1", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq, /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
@ -515,6 +529,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key2", seq), &value, /*columns=*/nullptr, LookupKey("key2", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq, /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
@ -568,6 +583,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify keys are no longer in MemTableList // Verify keys are no longer in MemTableList
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -575,6 +591,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_FALSE(found); ASSERT_FALSE(found);
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -582,6 +599,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_FALSE(found); ASSERT_FALSE(found);
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,
@ -590,6 +608,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify that the second memtable's keys are in the history // Verify that the second memtable's keys are in the history
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key1", seq), &value, /*columns=*/nullptr, LookupKey("key1", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq, /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
@ -597,6 +616,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_TRUE(found && s.IsNotFound()); ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = list.current()->GetFromHistory( found = list.current()->GetFromHistory(
LookupKey("key3", seq), &value, /*columns=*/nullptr, LookupKey("key3", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq, /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
@ -606,6 +626,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify that key2 from the first memtable is no longer in the history // Verify that key2 from the first memtable is no longer in the history
merge_context.Clear(); merge_context.Clear();
s = Status::OK();
found = found =
list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr, list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
/*timestamp=*/nullptr, &s, &merge_context, /*timestamp=*/nullptr, &s, &merge_context,

View File

@ -0,0 +1 @@
* Fix a bug where per kv checksum corruption may be ignored in MultiGet().