rocksdb/table/plain/plain_table_key_coding.cc
Hui Xiao 9a034801ce Group rocksdb.sst.read.micros stat by different user read IOActivity + misc (#11444)
Summary:
**Context/Summary:**
- Similar to https://github.com/facebook/rocksdb/pull/11288 but for user read such as `Get(), MultiGet(), DBIterator::XXX(), Verify(File)Checksum()`.
   - For this, I refactored some user-facing `MultiGet` calls in `TransactionBase` and various types of `DB` so that it does not call a user-facing `Get()` but `GetImpl()` for passing the `ReadOptions::io_activity` check (see PR conversation)
   - New user read stats breakdown are guarded by `kExceptDetailedTimers` since measurement shows they have 4-5% regression to the upstream/main.

- Misc
   - More refactoring: with https://github.com/facebook/rocksdb/pull/11288, we complete passing `ReadOptions/IOOptions` to FS level. So we can now replace the previously [added](https://github.com/facebook/rocksdb/pull/9424) `rate_limiter_priority` parameter in `RandomAccessFileReader`'s `Read/MultiRead/Prefetch()` with `IOOptions::rate_limiter_priority`
   - Also, `ReadAsync()` call time is measured in `SST_READ_MICRO` now

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11444

Test Plan:
- CI fake db crash/stress test
- Microbenchmarking

**Build** `make clean && ROCKSDB_NO_FBCODE=1 DEBUG_LEVEL=0 make -jN db_basic_bench`
- google benchmark version: 604f6fd3f4
- db_basic_bench_base: upstream
- db_basic_bench_pr: db_basic_bench_base + this PR
- asyncread_db_basic_bench_base: upstream + [db basic bench patch for IteratorNext](https://github.com/facebook/rocksdb/compare/main...hx235:rocksdb:micro_bench_async_read)
- asyncread_db_basic_bench_pr: asyncread_db_basic_bench_base + this PR

**Test**

Get
```
TEST_TMPDIR=/dev/shm ./db_basic_bench_{null_stat|base|pr} --benchmark_filter=DBGet/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1/negative_query:0/enable_filter:0/mmap:1/threads:1 --benchmark_repetitions=1000
```

Result
```
Coming soon
```

AsyncRead
```
TEST_TMPDIR=/dev/shm ./asyncread_db_basic_bench_{base|pr} --benchmark_filter=IteratorNext/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1/async_io:1/include_detailed_timers:0 --benchmark_repetitions=1000 > syncread_db_basic_bench_{base|pr}.out
```

Result
```
Base:
1956,1956,1968,1977,1979,1986,1988,1988,1988,1990,1991,1991,1993,1993,1993,1993,1994,1996,1997,1997,1997,1998,1999,2001,2001,2002,2004,2007,2007,2008,

PR (2.3% regression, due to measuring `SST_READ_MICRO` that wasn't measured before):
1993,2014,2016,2022,2024,2027,2027,2028,2028,2030,2031,2031,2032,2032,2038,2039,2042,2044,2044,2047,2047,2047,2048,2049,2050,2052,2052,2052,2053,2053,
```

Reviewed By: ajkr

Differential Revision: D45918925

Pulled By: hx235

fbshipit-source-id: 58a54560d9ebeb3a59b6d807639692614dad058a
2023-08-08 17:26:50 -07:00

507 lines
18 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/plain/plain_table_key_coding.h"
#include <algorithm>
#include <string>
#include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "table/plain/plain_table_factory.h"
#include "table/plain/plain_table_reader.h"
namespace ROCKSDB_NAMESPACE {
enum PlainTableEntryType : unsigned char {
kFullKey = 0,
kPrefixFromPreviousKey = 1,
kKeySuffix = 2,
};
namespace {
// Control byte:
// First two bits indicate type of entry
// Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
// are used. key_size-0x3F will be encoded as a variint32 after this bytes.
const unsigned char kSizeInlineLimit = 0x3F;
// Return 0 for error
size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
char* out_buffer) {
out_buffer[0] = type << 6;
if (key_size < static_cast<uint32_t>(kSizeInlineLimit)) {
// size inlined
out_buffer[0] |= static_cast<char>(key_size);
return 1;
} else {
out_buffer[0] |= kSizeInlineLimit;
char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
return ptr - out_buffer;
}
}
} // namespace
// Fill bytes_read with number of bytes read.
inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
PlainTableEntryType* entry_type,
uint32_t* key_size,
uint32_t* bytes_read) {
Slice next_byte_slice;
bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
if (!success) {
return file_reader_.status();
}
*entry_type = static_cast<PlainTableEntryType>(
(static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
6);
char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
if (inline_key_size < kSizeInlineLimit) {
*key_size = inline_key_size;
*bytes_read = 1;
return Status::OK();
} else {
uint32_t extra_size;
uint32_t tmp_bytes_read;
success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
&tmp_bytes_read);
if (!success) {
return file_reader_.status();
}
assert(tmp_bytes_read > 0);
*key_size = kSizeInlineLimit + extra_size;
*bytes_read = tmp_bytes_read + 1;
return Status::OK();
}
}
IOStatus PlainTableKeyEncoder::AppendKey(const Slice& key,
WritableFileWriter* file,
uint64_t* offset, char* meta_bytes_buf,
size_t* meta_bytes_buf_size) {
ParsedInternalKey parsed_key;
Status pik_status =
ParseInternalKey(key, &parsed_key, false /* log_err_key */); // TODO
if (!pik_status.ok()) {
return IOStatus::Corruption(pik_status.getState());
}
Slice key_to_write = key; // Portion of internal key to write out.
uint32_t user_key_size = static_cast<uint32_t>(key.size() - 8);
if (encoding_type_ == kPlain) {
if (fixed_user_key_len_ == kPlainTableVariableLength) {
// Write key length
char key_size_buf[5]; // tmp buffer for key size as varint32
char* ptr = EncodeVarint32(key_size_buf, user_key_size);
assert(ptr <= key_size_buf + sizeof(key_size_buf));
auto len = ptr - key_size_buf;
IOStatus io_s = file->Append(Slice(key_size_buf, len));
if (!io_s.ok()) {
return io_s;
}
*offset += len;
}
} else {
assert(encoding_type_ == kPrefix);
char size_bytes[12];
size_t size_bytes_pos = 0;
Slice prefix =
prefix_extractor_->Transform(Slice(key.data(), user_key_size));
if (key_count_for_prefix_ == 0 || prefix != pre_prefix_.GetUserKey() ||
key_count_for_prefix_ % index_sparseness_ == 0) {
key_count_for_prefix_ = 1;
pre_prefix_.SetUserKey(prefix);
size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!io_s.ok()) {
return io_s;
}
*offset += size_bytes_pos;
} else {
key_count_for_prefix_++;
if (key_count_for_prefix_ == 2) {
// For second key within a prefix, need to encode prefix length
size_bytes_pos +=
EncodeSize(kPrefixFromPreviousKey,
static_cast<uint32_t>(pre_prefix_.GetUserKey().size()),
size_bytes + size_bytes_pos);
}
uint32_t prefix_len =
static_cast<uint32_t>(pre_prefix_.GetUserKey().size());
size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
size_bytes + size_bytes_pos);
IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!io_s.ok()) {
return io_s;
}
*offset += size_bytes_pos;
key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
}
}
// Encode full key
// For value size as varint32 (up to 5 bytes).
// If the row is of value type with seqId 0, flush the special flag together
// in this buffer to safe one file append call, which takes 1 byte.
if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
IOStatus io_s =
file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
if (!io_s.ok()) {
return io_s;
}
*offset += key_to_write.size() - 8;
meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
*meta_bytes_buf_size += 1;
} else {
IOStatus io_s = file->Append(key_to_write);
if (!io_s.ok()) {
return io_s;
}
*offset += key_to_write.size();
}
return IOStatus::OK();
}
Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
uint32_t len) {
assert(file_offset + len <= file_info_->data_end_offset);
return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
len);
}
bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
Slice* out) {
const uint32_t kPrefetchSize = 256u;
// Try to read from buffers.
for (uint32_t i = 0; i < num_buf_; i++) {
Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
if (file_offset >= buffer->buf_start_offset &&
file_offset + len <= buffer->buf_start_offset + buffer->buf_len) {
*out = GetFromBuffer(buffer, file_offset, len);
return true;
}
}
Buffer* new_buffer;
// Data needed is not in any of the buffer. Allocate a new buffer.
if (num_buf_ < buffers_.size()) {
// Add a new buffer
new_buffer = new Buffer();
buffers_[num_buf_++].reset(new_buffer);
} else {
// Now simply replace the last buffer. Can improve the placement policy
// if needed.
new_buffer = buffers_[num_buf_ - 1].get();
}
assert(file_offset + len <= file_info_->data_end_offset);
uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
std::max(kPrefetchSize, len));
if (size_to_read > new_buffer->buf_capacity) {
new_buffer->buf.reset(new char[size_to_read]);
new_buffer->buf_capacity = size_to_read;
new_buffer->buf_len = 0;
}
Slice read_result;
// TODO: rate limit plain table reads.
Status s =
file_info_->file->Read(IOOptions(), file_offset, size_to_read,
&read_result, new_buffer->buf.get(), nullptr);
if (!s.ok()) {
status_ = s;
return false;
}
new_buffer->buf_start_offset = file_offset;
new_buffer->buf_len = size_to_read;
*out = GetFromBuffer(new_buffer, file_offset, len);
return true;
}
inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
uint32_t* bytes_read) {
if (file_info_->is_mmap_mode) {
const char* start = file_info_->file_data.data() + offset;
const char* limit =
file_info_->file_data.data() + file_info_->data_end_offset;
const char* key_ptr = GetVarint32Ptr(start, limit, out);
assert(key_ptr != nullptr);
*bytes_read = static_cast<uint32_t>(key_ptr - start);
return true;
} else {
return ReadVarint32NonMmap(offset, out, bytes_read);
}
}
bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
uint32_t* bytes_read) {
const char* start;
const char* limit;
const uint32_t kMaxVarInt32Size = 6u;
uint32_t bytes_to_read =
std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
Slice bytes;
if (!Read(offset, bytes_to_read, &bytes)) {
return false;
}
start = bytes.data();
limit = bytes.data() + bytes.size();
const char* key_ptr = GetVarint32Ptr(start, limit, out);
*bytes_read =
(key_ptr != nullptr) ? static_cast<uint32_t>(key_ptr - start) : 0;
return true;
}
Status PlainTableKeyDecoder::ReadInternalKey(
uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
Slice tmp_slice;
bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
if (!success) {
return file_reader_.status();
}
if (tmp_slice[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
// Special encoding for the row with seqID=0
parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
parsed_key->sequence = 0;
parsed_key->type = kTypeValue;
*bytes_read += user_key_size + 1;
*internal_key_valid = false;
} else {
success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
if (!success) {
return file_reader_.status();
}
*internal_key_valid = true;
Status pik_status = ParseInternalKey(*internal_key, parsed_key,
false /* log_err_key */); // TODO
if (!pik_status.ok()) {
return Status::Corruption(
Slice("Corrupted key found during next key read. "),
pik_status.getState());
}
*bytes_read += user_key_size + 8;
}
return Status::OK();
}
Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
ParsedInternalKey* parsed_key,
Slice* internal_key,
uint32_t* bytes_read,
bool* /*seekable*/) {
uint32_t user_key_size = 0;
Status s;
if (fixed_user_key_len_ != kPlainTableVariableLength) {
user_key_size = fixed_user_key_len_;
} else {
uint32_t tmp_size = 0;
uint32_t tmp_read;
bool success =
file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
if (!success) {
return file_reader_.status();
}
assert(tmp_read > 0);
user_key_size = tmp_size;
*bytes_read = tmp_read;
}
// dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true;
Slice decoded_internal_key;
s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
bytes_read, &decoded_internal_key_valid,
&decoded_internal_key);
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info()->is_mmap_mode) {
cur_key_.SetInternalKey(*parsed_key);
parsed_key->user_key =
Slice(cur_key_.GetInternalKey().data(), user_key_size);
if (internal_key != nullptr) {
*internal_key = cur_key_.GetInternalKey();
}
} else if (internal_key != nullptr) {
if (decoded_internal_key_valid) {
*internal_key = decoded_internal_key;
} else {
// Need to copy out the internal key
cur_key_.SetInternalKey(*parsed_key);
*internal_key = cur_key_.GetInternalKey();
}
}
return Status::OK();
}
Status PlainTableKeyDecoder::NextPrefixEncodingKey(
uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
uint32_t* bytes_read, bool* seekable) {
PlainTableEntryType entry_type;
bool expect_suffix = false;
Status s;
do {
uint32_t size = 0;
// dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true;
uint32_t my_bytes_read = 0;
s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
&my_bytes_read);
if (!s.ok()) {
return s;
}
if (my_bytes_read == 0) {
return Status::Corruption("Unexpected EOF when reading size of the key");
}
*bytes_read += my_bytes_read;
switch (entry_type) {
case kFullKey: {
expect_suffix = false;
Slice decoded_internal_key;
s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
bytes_read, &decoded_internal_key_valid,
&decoded_internal_key);
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info()->is_mmap_mode ||
(internal_key != nullptr && !decoded_internal_key_valid)) {
// In non-mmap mode, always need to make a copy of keys returned to
// users, because after reading value for the key, the key might
// be invalid.
cur_key_.SetInternalKey(*parsed_key);
saved_user_key_ = cur_key_.GetUserKey();
if (!file_reader_.file_info()->is_mmap_mode) {
parsed_key->user_key =
Slice(cur_key_.GetInternalKey().data(), size);
}
if (internal_key != nullptr) {
*internal_key = cur_key_.GetInternalKey();
}
} else {
if (internal_key != nullptr) {
*internal_key = decoded_internal_key;
}
saved_user_key_ = parsed_key->user_key;
}
break;
}
case kPrefixFromPreviousKey: {
if (seekable != nullptr) {
*seekable = false;
}
prefix_len_ = size;
assert(prefix_extractor_ == nullptr ||
prefix_extractor_->Transform(saved_user_key_).size() ==
prefix_len_);
// Need read another size flag for suffix
expect_suffix = true;
break;
}
case kKeySuffix: {
expect_suffix = false;
if (seekable != nullptr) {
*seekable = false;
}
Slice tmp_slice;
s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
bytes_read, &decoded_internal_key_valid,
&tmp_slice);
if (!s.ok()) {
return s;
}
if (!file_reader_.file_info()->is_mmap_mode) {
// In non-mmap mode, we need to make a copy of keys returned to
// users, because after reading value for the key, the key might
// be invalid.
// saved_user_key_ points to cur_key_. We are making a copy of
// the prefix part to another string, and construct the current
// key from the prefix part and the suffix part back to cur_key_.
std::string tmp =
Slice(saved_user_key_.data(), prefix_len_).ToString();
cur_key_.Reserve(prefix_len_ + size);
cur_key_.SetInternalKey(tmp, *parsed_key);
parsed_key->user_key =
Slice(cur_key_.GetInternalKey().data(), prefix_len_ + size);
saved_user_key_ = cur_key_.GetUserKey();
} else {
cur_key_.Reserve(prefix_len_ + size);
cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
*parsed_key);
}
parsed_key->user_key = cur_key_.GetUserKey();
if (internal_key != nullptr) {
*internal_key = cur_key_.GetInternalKey();
}
break;
}
default:
return Status::Corruption("Un-identified size flag.");
}
} while (expect_suffix); // Another round if suffix is expected.
return Status::OK();
}
Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
ParsedInternalKey* parsed_key,
Slice* internal_key, Slice* value,
uint32_t* bytes_read, bool* seekable) {
assert(value != nullptr);
Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
seekable);
if (s.ok()) {
assert(bytes_read != nullptr);
uint32_t value_size;
uint32_t value_size_bytes;
bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
&value_size, &value_size_bytes);
if (!success) {
return file_reader_.status();
}
if (value_size_bytes == 0) {
return Status::Corruption(
"Unexpected EOF when reading the next value's size.");
}
*bytes_read += value_size_bytes;
success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
if (!success) {
return file_reader_.status();
}
*bytes_read += value_size;
}
return s;
}
Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
ParsedInternalKey* parsed_key,
Slice* internal_key,
uint32_t* bytes_read,
bool* seekable) {
*bytes_read = 0;
if (seekable != nullptr) {
*seekable = true;
}
if (encoding_type_ == kPlain) {
return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
bytes_read, seekable);
} else {
assert(encoding_type_ == kPrefix);
return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
bytes_read, seekable);
}
}
} // namespace ROCKSDB_NAMESPACE