mirror of https://github.com/facebook/rocksdb.git
Fix locking for `ColumnFamilyOptions::inplace_update_support` (#12624)
Summary: In `SaveValue()`, the read lock needs to be obtained before `VerifyEntryChecksum()` because the KV checksum verification reads the entire value metadata+data, which is all mutable when `ColumnFamilyOptions::inplace_update_support == true`. In `MemTable::Update()`, the write lock needs to be obtained before mutating the value metadata (changing the value size) because it can be read concurrently. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12624 Test Plan: ``` $ make COMPILE_WITH_TSAN=1 -j56 db_stress ... $ python3 tools/db_crashtest.py blackbox --simple --max_key=10 --inplace_update_support=1 --interval=10 --allow_concurrent_memtable_write=0 ``` Reviewed By: cbi42 Differential Revision: D57034571 Pulled By: ajkr fbshipit-source-id: 3dddf881ad87923143acdf6bfec12ce47bb13a48
This commit is contained in:
parent
b8400c9faf
commit
933ac0e05c
|
@ -732,7 +732,7 @@ TEST_P(DbMemtableKVChecksumTest, GetWithCorruptAfterMemtableInsert) {
|
|||
});
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"Memtable::SaveValue:Begin:entry", [&](void* entry) {
|
||||
"Memtable::SaveValue:Found:entry", [&](void* entry) {
|
||||
char* buf = *static_cast<char**>(entry);
|
||||
buf[corrupt_byte_offset_] += corrupt_byte_addend_;
|
||||
++corrupt_byte_offset_;
|
||||
|
@ -769,7 +769,7 @@ TEST_P(DbMemtableKVChecksumTest,
|
|||
});
|
||||
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"Memtable::SaveValue:Begin:entry", [&](void* entry) {
|
||||
"Memtable::SaveValue:Found:entry", [&](void* entry) {
|
||||
char* buf = *static_cast<char**>(entry);
|
||||
buf[corrupt_byte_offset_] += corrupt_byte_addend_;
|
||||
++corrupt_byte_offset_;
|
||||
|
|
|
@ -928,21 +928,10 @@ struct Saver {
|
|||
} // anonymous namespace
|
||||
|
||||
static bool SaveValue(void* arg, const char* entry) {
|
||||
TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Begin:entry", &entry);
|
||||
Saver* s = static_cast<Saver*>(arg);
|
||||
assert(s != nullptr);
|
||||
assert(!s->value || !s->columns);
|
||||
|
||||
if (s->protection_bytes_per_key > 0) {
|
||||
*(s->status) = MemTable::VerifyEntryChecksum(
|
||||
entry, s->protection_bytes_per_key, s->allow_data_in_errors);
|
||||
if (!s->status->ok()) {
|
||||
ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
|
||||
// Memtable entry corrupted
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
MergeContext* merge_context = s->merge_context;
|
||||
SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
|
||||
const MergeOperator* merge_operator = s->merge_operator;
|
||||
|
@ -965,6 +954,22 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
if (user_comparator->EqualWithoutTimestamp(user_key_slice,
|
||||
s->key->user_key())) {
|
||||
// Correct user key
|
||||
TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Found:entry", &entry);
|
||||
std::unique_ptr<ReadLock> read_lock;
|
||||
if (s->inplace_update_support) {
|
||||
read_lock.reset(new ReadLock(s->mem->GetLock(s->key->user_key())));
|
||||
}
|
||||
|
||||
if (s->protection_bytes_per_key > 0) {
|
||||
*(s->status) = MemTable::VerifyEntryChecksum(
|
||||
entry, s->protection_bytes_per_key, s->allow_data_in_errors);
|
||||
if (!s->status->ok()) {
|
||||
ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
|
||||
// Memtable entry corrupted
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
|
||||
ValueType type;
|
||||
SequenceNumber seq;
|
||||
|
@ -1035,10 +1040,6 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (s->inplace_update_support) {
|
||||
s->mem->GetLock(s->key->user_key())->ReadLock();
|
||||
}
|
||||
|
||||
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
||||
|
||||
*(s->status) = Status::OK();
|
||||
|
@ -1049,10 +1050,6 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
s->columns->SetPlainValue(v);
|
||||
}
|
||||
|
||||
if (s->inplace_update_support) {
|
||||
s->mem->GetLock(s->key->user_key())->ReadUnlock();
|
||||
}
|
||||
|
||||
*(s->found_final_value) = true;
|
||||
*(s->is_blob_index) = true;
|
||||
|
||||
|
@ -1060,10 +1057,6 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
}
|
||||
case kTypeValue:
|
||||
case kTypeValuePreferredSeqno: {
|
||||
if (s->inplace_update_support) {
|
||||
s->mem->GetLock(s->key->user_key())->ReadLock();
|
||||
}
|
||||
|
||||
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
||||
|
||||
if (type == kTypeValuePreferredSeqno) {
|
||||
|
@ -1100,10 +1093,6 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
s->columns->SetPlainValue(v);
|
||||
}
|
||||
|
||||
if (s->inplace_update_support) {
|
||||
s->mem->GetLock(s->key->user_key())->ReadUnlock();
|
||||
}
|
||||
|
||||
*(s->found_final_value) = true;
|
||||
|
||||
if (s->is_blob_index != nullptr) {
|
||||
|
@ -1113,10 +1102,6 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
return false;
|
||||
}
|
||||
case kTypeWideColumnEntity: {
|
||||
if (s->inplace_update_support) {
|
||||
s->mem->GetLock(s->key->user_key())->ReadLock();
|
||||
}
|
||||
|
||||
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
|
||||
|
||||
*(s->status) = Status::OK();
|
||||
|
@ -1158,10 +1143,6 @@ static bool SaveValue(void* arg, const char* entry) {
|
|||
*(s->status) = s->columns->SetWideColumnValue(v);
|
||||
}
|
||||
|
||||
if (s->inplace_update_support) {
|
||||
s->mem->GetLock(s->key->user_key())->ReadUnlock();
|
||||
}
|
||||
|
||||
*(s->found_final_value) = true;
|
||||
|
||||
if (s->is_blob_index != nullptr) {
|
||||
|
@ -1499,9 +1480,9 @@ Status MemTable::Update(SequenceNumber seq, ValueType value_type,
|
|||
|
||||
// Update value, if new value size <= previous value size
|
||||
if (new_size <= prev_size) {
|
||||
WriteLock wl(GetLock(lkey.user_key()));
|
||||
char* p =
|
||||
EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
|
||||
WriteLock wl(GetLock(lkey.user_key()));
|
||||
memcpy(p, value.data(), value.size());
|
||||
assert((unsigned)((p + value.size()) - entry) ==
|
||||
(unsigned)(VarintLength(key_length) + key_length +
|
||||
|
|
|
@ -251,13 +251,14 @@ int db_stress_tool(int argc, char** argv) {
|
|||
}
|
||||
if ((FLAGS_enable_compaction_filter || FLAGS_inplace_update_support) &&
|
||||
(FLAGS_acquire_snapshot_one_in > 0 || FLAGS_compact_range_one_in > 0 ||
|
||||
FLAGS_iterpercent > 0 || FLAGS_test_batches_snapshots ||
|
||||
FLAGS_test_cf_consistency || FLAGS_check_multiget_consistency ||
|
||||
FLAGS_iterpercent > 0 || FLAGS_prefixpercent > 0 ||
|
||||
FLAGS_test_batches_snapshots || FLAGS_test_cf_consistency ||
|
||||
FLAGS_check_multiget_consistency ||
|
||||
FLAGS_check_multiget_entity_consistency)) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"Error: acquire_snapshot_one_in, compact_range_one_in, iterpercent, "
|
||||
"test_batches_snapshots, test_cf_consistency, "
|
||||
"prefixpercent, test_batches_snapshots, test_cf_consistency, "
|
||||
"check_multiget_consistency, check_multiget_entity_consistency must "
|
||||
"all be 0 when using compaction filter or inplace update support\n");
|
||||
exit(1);
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
* Fixed race conditions when `ColumnFamilyOptions::inplace_update_support == true` between user overwrites and reads on the same key.
|
Loading…
Reference in New Issue