Avoid manipulating const char* arrays

Summary:
We were manipulating `const char*` arrays in CompactionJob to
change the sequence number/types of keys. This patch changes
UpdateInternalKey() to use string methods to do the manipulation
and updates all calls accordingly.

Test Plan:
Added test case for UpdateInternalKey() in dbformat_test.
make && make check

Reviewers: sdong, rven, yhchiang, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D41985
This commit is contained in:
Andres Notzli 2015-07-14 00:21:41 -07:00
parent ab137af4ba
commit ae29495e4b
4 changed files with 35 additions and 19 deletions

View File

@ -790,7 +790,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// If we have a single key to write, simply write that key. // If we have a single key to write, simply write that key.
while (true) { while (true) {
// Invariant: key,value,ikey will always be the next entry to write // Invariant: key,value,ikey will always be the next entry to write
char* kptr = (char*)key.data(); Slice newkey(key.data(), key.size());
std::string kstr; std::string kstr;
// Zeroing out the sequence number leads to better compression. // Zeroing out the sequence number leads to better compression.
@ -803,11 +803,10 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// make a copy because updating in place would cause problems // make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator // with the priority queue that is managing the input key iterator
kstr.assign(key.data(), key.size()); kstr.assign(key.data(), key.size());
kptr = (char*)kstr.c_str(); UpdateInternalKey(&kstr, (uint64_t)0, ikey.type);
UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type); newkey = Slice(kstr);
} }
Slice newkey(kptr, key.size());
assert((key.clear(), 1)); // we do not need 'key' anymore assert((key.clear(), 1)); // we do not need 'key' anymore
// Open output file if necessary // Open output file if necessary
@ -953,8 +952,7 @@ void CompactionJob::CallCompactionFilterV2(
if (compact_->to_delete_buf_[i]) { if (compact_->to_delete_buf_[i]) {
// update the string buffer directly // update the string buffer directly
// the Slice buffer points to the updated buffer // the Slice buffer points to the updated buffer
UpdateInternalKey(&compact_->key_str_buf_[i][0], UpdateInternalKey(&compact_->key_str_buf_[i], ikey_buf[i].sequence,
compact_->key_str_buf_[i].size(), ikey_buf[i].sequence,
kTypeDeletion); kTypeDeletion);
// no value associated with delete // no value associated with delete

View File

@ -195,14 +195,17 @@ inline bool ParseInternalKey(const Slice& internal_key,
return (c <= static_cast<unsigned char>(kValueTypeForSeek)); return (c <= static_cast<unsigned char>(kValueTypeForSeek));
} }
// Update the sequence number in the internal key // Update the sequence number in the internal key.
inline void UpdateInternalKey(char* internal_key, // Guarantees not to invalidate ikey.data().
const size_t internal_key_size, inline void UpdateInternalKey(std::string* ikey,
uint64_t seq, ValueType t) { uint64_t seq, ValueType t) {
assert(internal_key_size >= 8); size_t ikey_sz = ikey->size();
char* seqtype = internal_key + internal_key_size - 8; assert(ikey_sz >= 8);
uint64_t newval = (seq << 8) | t; uint64_t newval = (seq << 8) | t;
EncodeFixed64(seqtype, newval);
// Note: Since C++11, strings are guaranteed to be stored contiguously and
// string::operator[]() is guaranteed not to change ikey.data().
EncodeFixed64(&(*ikey)[ikey_sz - 8], newval);
} }
// Get the sequence number from the internal key // Get the sequence number from the internal key

View File

@ -149,6 +149,25 @@ TEST_F(FormatTest, IterKeyOperation) {
"abcdefghijklmnopqrstuvwxyz")); "abcdefghijklmnopqrstuvwxyz"));
} }
TEST_F(FormatTest, UpdateInternalKey) {
std::string user_key("abcdefghijklmnopqrstuvwxyz");
uint64_t new_seq = 0x123456;
ValueType new_val_type = kTypeDeletion;
std::string ikey;
AppendInternalKey(&ikey, ParsedInternalKey(user_key, 100U, kTypeValue));
size_t ikey_size = ikey.size();
UpdateInternalKey(&ikey, new_seq, new_val_type);
ASSERT_EQ(ikey_size, ikey.size());
Slice in(ikey);
ParsedInternalKey decoded;
ASSERT_TRUE(ParseInternalKey(in, &decoded));
ASSERT_EQ(user_key, decoded.user_key.ToString());
ASSERT_EQ(new_seq, decoded.sequence);
ASSERT_EQ(new_val_type, decoded.type);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -128,8 +128,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
std::string& original_key = std::string& original_key =
keys_.back(); // The original key encountered keys_.back(); // The original key encountered
orig_ikey.type = kTypeValue; orig_ikey.type = kTypeValue;
UpdateInternalKey(&original_key[0], original_key.size(), UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
orig_ikey.sequence, orig_ikey.type);
operands_.back() = std::move(merge_result); operands_.back() = std::move(merge_result);
} }
@ -159,8 +158,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
std::string& original_key = std::string& original_key =
keys_.back(); // The original key encountered keys_.back(); // The original key encountered
orig_ikey.type = kTypeValue; orig_ikey.type = kTypeValue;
UpdateInternalKey(&original_key[0], original_key.size(), UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
orig_ikey.sequence, orig_ikey.type);
operands_.back() = std::move(merge_result); operands_.back() = std::move(merge_result);
} }
@ -223,9 +221,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
if (success_) { if (success_) {
std::string& original_key = keys_.back(); // The original key encountered std::string& original_key = keys_.back(); // The original key encountered
orig_ikey.type = kTypeValue; orig_ikey.type = kTypeValue;
UpdateInternalKey(&original_key[0], original_key.size(), UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
orig_ikey.sequence, orig_ikey.type);
operands_.back() = std::move(merge_result); operands_.back() = std::move(merge_result);
} else { } else {
RecordTick(stats, NUMBER_MERGE_FAILURES); RecordTick(stats, NUMBER_MERGE_FAILURES);