Return Status from MemTable mutation functions (#7656)

Summary:
This PR updates `MemTable::Add()`, `MemTable::Update()`, and
`MemTable::UpdateCallback()` to return `Status` objects, and adapts the
client code in `MemTableInserter`. The goal is to prepare these
functions for key-value checksum, where we want to verify key-value
integrity while adding to memtable. After this PR, the memtable mutation
functions can report a failed integrity check by returning `Status::Corruption`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7656

Reviewed By: riversand963

Differential Revision: D24900497

Pulled By: ajkr

fbshipit-source-id: 1a7e80581e3774676f2bbba2f0a0b04890f40009
This commit is contained in:
Andrew Kryczka 2020-11-23 16:27:46 -08:00 committed by Facebook GitHub Bot
parent 0baa5055f1
commit dd6b7fc520
7 changed files with 261 additions and 240 deletions

View file

@ -146,22 +146,15 @@ TEST_F(DBMemTableTest, DuplicateSeq) {
kMaxSequenceNumber, 0 /* column_family_id */);
// Write some keys and make sure it returns false on duplicates
bool res;
res = mem->Add(seq, kTypeValue, "key", "value2");
ASSERT_TRUE(res);
res = mem->Add(seq, kTypeValue, "key", "value2");
ASSERT_FALSE(res);
ASSERT_OK(mem->Add(seq, kTypeValue, "key", "value2"));
ASSERT_TRUE(mem->Add(seq, kTypeValue, "key", "value2").IsTryAgain());
// Changing the type should still cause the duplicatae key
res = mem->Add(seq, kTypeMerge, "key", "value2");
ASSERT_FALSE(res);
ASSERT_TRUE(mem->Add(seq, kTypeMerge, "key", "value2").IsTryAgain());
// Changing the seq number will make the key fresh
res = mem->Add(seq + 1, kTypeMerge, "key", "value2");
ASSERT_TRUE(res);
ASSERT_OK(mem->Add(seq + 1, kTypeMerge, "key", "value2"));
// Test with different types for duplicate keys
res = mem->Add(seq, kTypeDeletion, "key", "");
ASSERT_FALSE(res);
res = mem->Add(seq, kTypeSingleDeletion, "key", "");
ASSERT_FALSE(res);
ASSERT_TRUE(mem->Add(seq, kTypeDeletion, "key", "").IsTryAgain());
ASSERT_TRUE(mem->Add(seq, kTypeSingleDeletion, "key", "").IsTryAgain());
// Test the duplicate keys under stress
for (int i = 0; i < 10000; i++) {
@ -169,11 +162,11 @@ TEST_F(DBMemTableTest, DuplicateSeq) {
if (!insert_dup) {
seq++;
}
res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq));
s = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq));
if (insert_dup) {
ASSERT_FALSE(res);
ASSERT_TRUE(s.IsTryAgain());
} else {
ASSERT_TRUE(res);
ASSERT_OK(s);
}
}
delete mem;
@ -185,10 +178,8 @@ TEST_F(DBMemTableTest, DuplicateSeq) {
mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
kMaxSequenceNumber, 0 /* column_family_id */);
// Insert a duplicate key with _ in it
res = mem->Add(seq, kTypeValue, "key_1", "value");
ASSERT_TRUE(res);
res = mem->Add(seq, kTypeValue, "key_1", "value");
ASSERT_FALSE(res);
ASSERT_OK(mem->Add(seq, kTypeValue, "key_1", "value"));
ASSERT_TRUE(mem->Add(seq, kTypeValue, "key_1", "value").IsTryAgain());
delete mem;
// Test when InsertConcurrently will be invoked
@ -197,10 +188,11 @@ TEST_F(DBMemTableTest, DuplicateSeq) {
mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
kMaxSequenceNumber, 0 /* column_family_id */);
MemTablePostProcessInfo post_process_info;
res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
ASSERT_TRUE(res);
res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info);
ASSERT_FALSE(res);
ASSERT_OK(
mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info));
ASSERT_TRUE(
mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info)
.IsTryAgain());
delete mem;
}
@ -227,8 +219,7 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
// Put 0 as the base
PutFixed64(&value, static_cast<uint64_t>(0));
bool res = mem->Add(0, kTypeValue, "key", value);
ASSERT_TRUE(res);
ASSERT_OK(mem->Add(0, kTypeValue, "key", value));
value.clear();
// Write Merge concurrently
@ -237,9 +228,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
std::string v1;
for (int seq = 1; seq < num_ops / 2; seq++) {
PutFixed64(&v1, seq);
bool res1 =
mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1);
ASSERT_TRUE(res1);
ASSERT_OK(
mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1));
v1.clear();
}
});
@ -248,9 +238,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
std::string v2;
for (int seq = num_ops / 2; seq < num_ops; seq++) {
PutFixed64(&v2, seq);
bool res2 =
mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2);
ASSERT_TRUE(res2);
ASSERT_OK(
mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2));
v2.clear();
}
});
@ -261,8 +250,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) {
ReadOptions roptions;
SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey("key", kMaxSequenceNumber);
res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status, &merge_context,
&max_covering_tombstone_seq, roptions);
bool res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status,
&merge_context, &max_covering_tombstone_seq, roptions);
ASSERT_TRUE(res);
uint64_t ivalue = DecodeFixed64(Slice(value).data());
uint64_t sum = 0;

View file

@ -187,7 +187,7 @@ TEST_F(FlushJobTest, NonEmpty) {
for (int i = 1; i < 10000; ++i) {
std::string key(ToString((i + 1000) % 10000));
std::string value("value" + key);
new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
ASSERT_OK(new_mem->Add(SequenceNumber(i), kTypeValue, key, value));
if ((i + 1000) % 10000 < 9995) {
InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
inserted_keys.push_back({internal_key.Encode().ToString(), value});
@ -195,7 +195,8 @@ TEST_F(FlushJobTest, NonEmpty) {
}
{
new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a");
ASSERT_OK(new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995",
"9999a"));
InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"});
}
@ -222,7 +223,7 @@ TEST_F(FlushJobTest, NonEmpty) {
}
const SequenceNumber seq(i + 10001);
new_mem->Add(seq, kTypeBlobIndex, key, blob_index);
ASSERT_OK(new_mem->Add(seq, kTypeBlobIndex, key, blob_index));
InternalKey internal_key(key, seq, kTypeBlobIndex);
inserted_keys.push_back({internal_key.Encode().ToString(), blob_index});
@ -283,8 +284,8 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
for (size_t j = 0; j < num_keys_per_table; ++j) {
std::string key(ToString(j + i * num_keys_per_table));
std::string value("value" + key);
mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key,
value);
ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue,
key, value));
}
}
@ -356,7 +357,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
for (size_t j = 0; j != num_keys_per_memtable; ++j) {
std::string key(ToString(j + i * num_keys_per_memtable));
std::string value("value" + key);
mem->Add(curr_seqno++, kTypeValue, key, value);
ASSERT_OK(mem->Add(curr_seqno++, kTypeValue, key, value));
}
cfd->imm()->Add(mem, &to_delete);
@ -466,7 +467,7 @@ TEST_F(FlushJobTest, Snapshots) {
for (int j = 0; j < insertions; ++j) {
std::string value(rnd.HumanReadableString(10));
auto seqno = ++current_seqno;
new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value);
ASSERT_OK(new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value));
// a key is visible only if:
// 1. it's the last one written (j == insertions - 1)
// 2. there's a snapshot pointing at it
@ -518,7 +519,7 @@ class FlushJobTimestampTest : public FlushJobTestBase {
Slice value) {
std::string key_str(std::move(key));
PutFixed64(&key_str, ts);
memtable->Add(seq, value_type, key_str, value);
ASSERT_OK(memtable->Add(seq, value_type, key_str, value));
}
protected:

View file

@ -480,10 +480,10 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
return {entry_count * (data_size / n), entry_count};
}
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info, void** hint) {
Status MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info, void** hint) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
@ -519,12 +519,12 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
if (UNLIKELY(!res)) {
return res;
return Status::TryAgain("key+seq exists");
}
} else {
bool res = table->InsertKey(handle);
if (UNLIKELY(!res)) {
return res;
return Status::TryAgain("key+seq exists");
}
}
@ -566,7 +566,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
? table->InsertKeyConcurrently(handle)
: table->InsertKeyWithHintConcurrently(handle, hint);
if (UNLIKELY(!res)) {
return res;
return Status::TryAgain("key+seq exists");
}
assert(post_process_info != nullptr);
@ -600,7 +600,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
is_range_del_table_empty_.store(false, std::memory_order_relaxed);
}
UpdateOldestKeyTime();
return true;
return Status::OK();
}
// Callback from MemTable::Get()
@ -973,9 +973,8 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
PERF_COUNTER_ADD(get_from_memtable_count, 1);
}
void MemTable::Update(SequenceNumber seq,
const Slice& key,
const Slice& value) {
Status MemTable::Update(SequenceNumber seq, const Slice& key,
const Slice& value) {
LookupKey lkey(key, seq);
Slice mem_key = lkey.memtable_key();
@ -1019,22 +1018,18 @@ void MemTable::Update(SequenceNumber seq,
(unsigned)(VarintLength(key_length) + key_length +
VarintLength(value.size()) + value.size()));
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
return;
return Status::OK();
}
}
}
}
// key doesn't exist
bool add_res __attribute__((__unused__));
add_res = Add(seq, kTypeValue, key, value);
// We already checked unused != seq above. In that case, Add should not fail.
assert(add_res);
// The latest value is not `kTypeValue` or key doesn't exist
return Add(seq, kTypeValue, key, value);
}
bool MemTable::UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta) {
Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
const Slice& delta) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
@ -1088,16 +1083,17 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
}
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
UpdateFlushState();
return true;
return Status::OK();
} else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value));
Status s = Add(seq, kTypeValue, key, Slice(str_value));
RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
UpdateFlushState();
return true;
return s;
} else if (status == UpdateStatus::UPDATE_FAILED) {
// No action required. Return.
// `UPDATE_FAILED` is named incorrectly. It indicates no update
// happened. It does not indicate a failure happened.
UpdateFlushState();
return true;
return Status::OK();
}
}
default:
@ -1105,9 +1101,8 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
}
}
}
// If the latest value is not kTypeValue
// or key doesn't exist
return false;
// The latest value is not `kTypeValue` or key doesn't exist
return Status::NotFound();
}
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {

View file

@ -182,12 +182,13 @@ class MemTable {
// REQUIRES: if allow_concurrent = false, external synchronization to prevent
// simultaneous operations on the same MemTable.
//
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
bool Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr,
void** hint = nullptr);
// Returns `Status::TryAgain` if the `seq`, `key` combination already exists
// in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
// The next attempt should try a larger value for `seq`.
Status Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr,
void** hint = nullptr);
// Used to Get value associated with key or Get Merge Operands associated
// with key.
@ -239,35 +240,34 @@ class MemTable {
void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob);
// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue
// if new sizeof(new_value) <= sizeof(prev_value)
// update inplace
// else add(key, new_value)
// else add(key, new_value)
// If `key` exists in current memtable with type `kTypeValue` and the existing
// value is at least as large as the new value, updates it in-place. Otherwise
// adds the new value to the memtable out-of-place.
//
// Returns `Status::TryAgain` if the `seq`, `key` combination already exists
// in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
// The next attempt should try a larger value for `seq`.
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void Update(SequenceNumber seq,
const Slice& key,
const Slice& value);
Status Update(SequenceNumber seq, const Slice& key, const Slice& value);
// If prev_value for key exists, attempts to update it inplace.
// else returns false
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue
// new_value = delta(prev_value)
// if sizeof(new_value) <= sizeof(prev_value)
// update inplace
// else add(key, new_value)
// else return false
// If `key` exists in current memtable with type `kTypeValue` and the existing
// value is at least as large as the new value, updates it in-place. Otherwise
// if `key` exists in current memtable with type `kTypeValue`, adds the new
// value to the memtable out-of-place.
//
// Returns `Status::NotFound` if `key` does not exist in current memtable or
// the latest version of `key` does not have `kTypeValue`.
//
// Returns `Status::TryAgain` if the `seq`, `key` combination already exists
// in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true.
// The next attempt should try a larger value for `seq`.
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
bool UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta);
Status UpdateCallback(SequenceNumber seq, const Slice& key,
const Slice& delta);
// Returns the number of successive merge entries starting from the newest
// entry for the key up to the last non-merge entry or last entry for the

View file

@ -242,10 +242,10 @@ TEST_F(MemTableListTest, GetTest) {
mem->Ref();
// Write some keys to this memtable.
mem->Add(++seq, kTypeDeletion, "key1", "");
mem->Add(++seq, kTypeValue, "key2", "value2");
mem->Add(++seq, kTypeValue, "key1", "value1");
mem->Add(++seq, kTypeValue, "key2", "value2.2");
ASSERT_OK(mem->Add(++seq, kTypeDeletion, "key1", ""));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2"));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", "value1"));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2"));
// Fetch the newly written keys
merge_context.Clear();
@ -283,8 +283,8 @@ TEST_F(MemTableListTest, GetTest) {
kMaxSequenceNumber, 0 /* column_family_id */);
mem2->Ref();
mem2->Add(++seq, kTypeDeletion, "key1", "");
mem2->Add(++seq, kTypeValue, "key2", "value2.3");
ASSERT_OK(mem2->Add(++seq, kTypeDeletion, "key1", ""));
ASSERT_OK(mem2->Add(++seq, kTypeValue, "key2", "value2.3"));
// Add second memtable to list
list.Add(mem2, &to_delete);
@ -359,9 +359,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
mem->Ref();
// Write some keys to this memtable.
mem->Add(++seq, kTypeDeletion, "key1", "");
mem->Add(++seq, kTypeValue, "key2", "value2");
mem->Add(++seq, kTypeValue, "key2", "value2.2");
ASSERT_OK(mem->Add(++seq, kTypeDeletion, "key1", ""));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2"));
ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2"));
// Fetch the newly written keys
merge_context.Clear();
@ -443,8 +443,8 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
kMaxSequenceNumber, 0 /* column_family_id */);
mem2->Ref();
mem2->Add(++seq, kTypeDeletion, "key1", "");
mem2->Add(++seq, kTypeValue, "key3", "value3");
ASSERT_OK(mem2->Add(++seq, kTypeDeletion, "key1", ""));
ASSERT_OK(mem2->Add(++seq, kTypeValue, "key3", "value3"));
// Add second memtable to list
list.Add(mem2, &to_delete);
@ -554,11 +554,11 @@ TEST_F(MemTableListTest, FlushPendingTest) {
std::string value;
MergeContext merge_context;
mem->Add(++seq, kTypeValue, "key1", ToString(i));
mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", ToString(i)));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"));
ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""));
tables.push_back(mem);
}
@ -823,11 +823,11 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
std::string value;
mem->Add(++seq, kTypeValue, "key1", ToString(i));
mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", ToString(i)));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"));
ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"));
ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""));
elem.push_back(mem);
}

View file

@ -1425,19 +1425,21 @@ class MemTableInserter : public WriteBatch::Handler {
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
key, value);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
MaybeAdvanceSeq(batch_boundry);
return ret_status;
}
assert(ret_status.ok());
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
@ -1445,23 +1447,17 @@ class MemTableInserter : public WriteBatch::Handler {
// any kind of transactions including the ones that use seq_per_batch
assert(!seq_per_batch_ || !moptions->inplace_update_support);
if (!moptions->inplace_update_support) {
bool mem_res =
ret_status =
mem->Add(sequence_, value_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
}
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
mem->Update(sequence_, key, value);
ret_status = mem->Update(sequence_, key, value);
} else {
assert(!concurrent_memtable_writes_);
if (mem->UpdateCallback(sequence_, key, value)) {
} else {
ret_status = mem->UpdateCallback(sequence_, key, value);
if (ret_status.IsNotFound()) {
// key not found in memtable. Do sst get, update, add
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
@ -1475,50 +1471,69 @@ class MemTableInserter : public WriteBatch::Handler {
std::string merged_value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
Status s = Status::NotSupported();
Status get_status = Status::NotSupported();
if (db_ != nullptr && recovering_log_number_ == 0) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
s = db_->Get(ropts, cf_handle, key, &prev_value);
get_status = db_->Get(ropts, cf_handle, key, &prev_value);
}
// Intentionally overwrites the `NotFound` in `ret_status`.
if (!get_status.ok() && !get_status.IsNotFound()) {
ret_status = get_status;
} else {
ret_status = Status::OK();
}
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
s.ok() ? &prev_size : nullptr,
value, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value.
bool mem_res __attribute__((__unused__));
mem_res = mem->Add(
sequence_, value_type, key, Slice(prev_buffer, prev_size));
assert(mem_res);
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
} else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
bool mem_res __attribute__((__unused__));
mem_res =
mem->Add(sequence_, value_type, key, Slice(merged_value));
assert(mem_res);
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
if (ret_status.ok()) {
UpdateStatus update_status;
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
if (get_status.ok()) {
update_status = moptions->inplace_callback(prev_buffer, &prev_size,
value, &merged_value);
} else {
update_status = moptions->inplace_callback(
nullptr /* existing_value */, nullptr /* existing_value_size */,
value, &merged_value);
}
if (update_status == UpdateStatus::UPDATED_INPLACE) {
assert(get_status.ok());
// prev_value is updated in-place with final value.
ret_status = mem->Add(sequence_, value_type, key,
Slice(prev_buffer, prev_size));
if (ret_status.ok()) {
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
} else if (update_status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
ret_status =
mem->Add(sequence_, value_type, key, Slice(merged_value));
if (ret_status.ok()) {
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
}
}
}
if (UNLIKELY(ret_status.IsTryAgain())) {
assert(seq_per_batch_);
const bool kBatchBoundary = true;
MaybeAdvanceSeq(kBatchBoundary);
} else if (ret_status.ok()) {
MaybeAdvanceSeq();
CheckMemtableFull();
}
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
// If `ret_status` is `TryAgain` then the next (successful) try will add
// the key to the rebuilding transaction object. If `ret_status` is
// another non-OK `Status`, then the `rebuilding_trx_` will be thrown
// away. So we only need to add to it when `ret_status.ok()`.
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
key, value);
assert(ret_status.ok());
}
// Since all Puts are logged in transaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
MaybeAdvanceSeq();
CheckMemtableFull();
return ret_status;
}
@ -1531,18 +1546,18 @@ class MemTableInserter : public WriteBatch::Handler {
const Slice& value, ValueType delete_type) {
Status ret_status;
MemTable* mem = cf_mems_->GetMemTable();
bool mem_res =
ret_status =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
if (UNLIKELY(ret_status.IsTryAgain())) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
const bool kBatchBoundary = true;
MaybeAdvanceSeq(kBatchBoundary);
} else if (ret_status.ok()) {
MaybeAdvanceSeq();
CheckMemtableFull();
}
MaybeAdvanceSeq();
CheckMemtableFull();
return ret_status;
}
@ -1555,17 +1570,18 @@ class MemTableInserter : public WriteBatch::Handler {
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
MaybeAdvanceSeq(batch_boundry);
return ret_status;
}
@ -1578,10 +1594,12 @@ class MemTableInserter : public WriteBatch::Handler {
(0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
// If `ret_status` is `TryAgain` then the next (successful) try will add
// the key to the rebuilding transaction object. If `ret_status` is
// another non-OK `Status`, then the `rebuilding_trx_` will be thrown
// away. So we only need to add to it when `ret_status.ok()`.
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
}
@ -1598,27 +1616,31 @@ class MemTableInserter : public WriteBatch::Handler {
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
MaybeAdvanceSeq(batch_boundry);
return ret_status;
}
assert(ret_status.ok());
ret_status =
DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
// If `ret_status` is `TryAgain` then the next (successful) try will add
// the key to the rebuilding transaction object. If `ret_status` is
// another non-OK `Status`, then the `rebuilding_trx_` will be thrown
// away. So we only need to add to it when `ret_status.ok()`.
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key);
}
@ -1636,21 +1658,22 @@ class MemTableInserter : public WriteBatch::Handler {
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
ret_status = WriteBatchInternal::DeleteRange(
rebuilding_trx_, column_family_id, begin_key, end_key);
assert(ret_status.ok());
// TODO(myabandeh): when transactional DeleteRange support is added,
// check if end_key must also be added.
batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
MaybeAdvanceSeq(batch_boundry);
return ret_status;
}
assert(ret_status.ok());
if (db_ != nullptr) {
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
@ -1659,6 +1682,8 @@ class MemTableInserter : public WriteBatch::Handler {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
if (!cfd->is_delete_range_supported()) {
// TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
ret_status.PermitUncheckedError();
return Status::NotSupported(
std::string("DeleteRange not supported for table type ") +
cfd->ioptions()->table_factory->Name() + " in CF " +
@ -1666,10 +1691,14 @@ class MemTableInserter : public WriteBatch::Handler {
}
int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
if (cmp > 0) {
// TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
ret_status.PermitUncheckedError();
// It's an empty range where endpoints appear mistaken. Don't bother
// applying it to the DB, and return an error to the user.
return Status::InvalidArgument("end key comes before start key");
} else if (cmp == 0) {
// TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
ret_status.PermitUncheckedError();
// It's an empty range. Don't bother applying it to the DB.
return Status::OK();
}
@ -1678,10 +1707,12 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status =
DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
// optimize for non-recovery mode
// If `ret_status` is `TryAgain` then the next (successful) try will add
// the key to the rebuilding transaction object. If `ret_status` is
// another non-OK `Status`, then the `rebuilding_trx_` will be thrown
// away. So we only need to add to it when `ret_status.ok()`.
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
ret_status = WriteBatchInternal::DeleteRange(
rebuilding_trx_, column_family_id, begin_key, end_key);
}
@ -1699,19 +1730,21 @@ class MemTableInserter : public WriteBatch::Handler {
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
bool batch_boundry = false;
if (rebuilding_trx_ != nullptr) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit.
ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
column_family_id, key, value);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
MaybeAdvanceSeq(batch_boundry);
return ret_status;
}
assert(ret_status.ok());
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
@ -1756,60 +1789,60 @@ class MemTableInserter : public WriteBatch::Handler {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
db_->Get(read_options, cf_handle, key, &get_value);
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = moptions->merge_operator;
assert(merge_operator);
std::string new_value;
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics, Env::Default());
if (!merge_status.ok()) {
// Failed to merge!
// Store the delta in memtable
Status get_status = db_->Get(read_options, cf_handle, key, &get_value);
if (!get_status.ok()) {
// Failed to read a key we know exists. Store the delta in memtable.
perform_merge = false;
} else {
// 3) Add value to memtable
assert(!concurrent_memtable_writes_);
bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = moptions->merge_operator;
assert(merge_operator);
std::string new_value;
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics, Env::Default());
if (!merge_status.ok()) {
// Failed to merge!
// Store the delta in memtable
perform_merge = false;
} else {
// 3) Add value to memtable
assert(!concurrent_memtable_writes_);
ret_status = mem->Add(sequence_, kTypeValue, key, new_value);
}
}
}
if (!perform_merge) {
// Add merge operator to memtable
bool mem_res =
// Add merge operand to memtable
ret_status =
mem->Add(sequence_, kTypeMerge, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
}
}
if (UNLIKELY(ret_status.IsTryAgain())) {
assert(seq_per_batch_);
const bool kBatchBoundary = true;
MaybeAdvanceSeq(kBatchBoundary);
} else if (ret_status.ok()) {
MaybeAdvanceSeq();
CheckMemtableFull();
}
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
// If `ret_status` is `TryAgain` then the next (successful) try will add
// the key to the rebuilding transaction object. If `ret_status` is
// another non-OK `Status`, then the `rebuilding_trx_` will be thrown
// away. So we only need to add to it when `ret_status.ok()`.
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object.
ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
key, value);
assert(ret_status.ok());
}
MaybeAdvanceSeq();
CheckMemtableFull();
return ret_status;
}

View file

@ -504,7 +504,10 @@ class MemTableConstructor: public Constructor {
memtable_->Ref();
int seq = 1;
for (const auto& kv : kv_map) {
memtable_->Add(seq, kTypeValue, kv.first, kv.second);
Status s = memtable_->Add(seq, kTypeValue, kv.first, kv.second);
if (!s.ok()) {
return s;
}
seq++;
}
return Status::OK();