mirror of https://github.com/facebook/rocksdb.git
GetAllKeyVersions() to take an extra argument of `max_num_ikeys`. (#4271)
Summary: Right now, `ldb idump` may have memory out of control if there is a big range of tombstones. Add an option to cut maxinum number of keys in GetAllKeyVersions(), and push down --max_num_ikeys from ldb. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4271 Differential Revision: D9369149 Pulled By: siying fbshipit-source-id: 7cbb797b7d2fa16573495a7e84937456d3ff25bf
This commit is contained in:
parent
aeed4f0749
commit
9c0c8f5ff6
|
@ -2,6 +2,8 @@
|
||||||
## Unreleased
|
## Unreleased
|
||||||
### Public API Change
|
### Public API Change
|
||||||
* The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons
|
* The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons
|
||||||
|
* GetAllKeyVersions() to take an extra argument of `max_num_ikeys`.
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
* Changes the format of index blocks by delta encoding the index values, which are the block handles. This saves the encoding of BlockHandle::offset of the non-head index entries in each restart interval. The feature is backward compatible but not forward compatible. It is disabled by default unless format_version 4 or above is used.
|
* Changes the format of index blocks by delta encoding the index values, which are the block handles. This saves the encoding of BlockHandle::offset of the non-head index entries in each restart interval. The feature is backward compatible but not forward compatible. It is disabled by default unless format_version 4 or above is used.
|
||||||
* Add a new tool: trace_analyzer. Trace_analyzer analyzes the trace file generated by using trace_replay API. It can convert the binary format trace file to a human readable txt file, output the statistics of the analyzed query types such as access statistics and size statistics, combining the dumped whole key space file to analyze, support query correlation analyzing, and etc. Current supported query types are: Get, Put, Delete, SingleDelete, DeleteRange, Merge, Iterator (Seek, SeekForPrev only).
|
* Add a new tool: trace_analyzer. Trace_analyzer analyzes the trace file generated by using trace_replay API. It can convert the binary format trace file to a human readable txt file, output the statistics of the analyzed query types such as access statistics and size statistics, combining the dumped whole key space file to analyze, support query correlation analyzing, and etc. Current supported query types are: Get, Put, Delete, SingleDelete, DeleteRange, Merge, Iterator (Seek, SeekForPrev only).
|
||||||
|
|
|
@ -31,9 +31,13 @@ struct KeyVersion {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Returns listing of all versions of keys in the provided user key range.
|
// Returns listing of all versions of keys in the provided user key range.
|
||||||
// The range is inclusive-inclusive, i.e., [`begin_key`, `end_key`].
|
// The range is inclusive-inclusive, i.e., [`begin_key`, `end_key`], or
|
||||||
|
// `max_num_ikeys` has been reached. Since all those keys returned will be
|
||||||
|
// copied to memory, if the range covers too many keys, the memory usage
|
||||||
|
// may be huge. `max_num_ikeys` can be used to cap the memory usage.
|
||||||
// The result is inserted into the provided vector, `key_versions`.
|
// The result is inserted into the provided vector, `key_versions`.
|
||||||
Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
||||||
|
size_t max_num_ikeys,
|
||||||
std::vector<KeyVersion>* key_versions);
|
std::vector<KeyVersion>* key_versions);
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
|
@ -1237,7 +1237,7 @@ void InternalDumpCommand::DoCommand() {
|
||||||
|
|
||||||
// Cast as DBImpl to get internal iterator
|
// Cast as DBImpl to get internal iterator
|
||||||
std::vector<KeyVersion> key_versions;
|
std::vector<KeyVersion> key_versions;
|
||||||
Status st = GetAllKeyVersions(db_, from_, to_, &key_versions);
|
Status st = GetAllKeyVersions(db_, from_, to_, max_keys_, &key_versions);
|
||||||
if (!st.ok()) {
|
if (!st.ok()) {
|
||||||
exec_state_ = LDBCommandExecuteResult::Failed(st.ToString());
|
exec_state_ = LDBCommandExecuteResult::Failed(st.ToString());
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -196,8 +196,9 @@ class BlobDBTest : public testing::Test {
|
||||||
const std::map<std::string, KeyVersion> &expected_versions) {
|
const std::map<std::string, KeyVersion> &expected_versions) {
|
||||||
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
|
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
|
||||||
DB *db = blob_db_->GetRootDB();
|
DB *db = blob_db_->GetRootDB();
|
||||||
|
const size_t kMaxKeys = 10000;
|
||||||
std::vector<KeyVersion> versions;
|
std::vector<KeyVersion> versions;
|
||||||
GetAllKeyVersions(db, "", "", &versions);
|
GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
|
||||||
ASSERT_EQ(expected_versions.size(), versions.size());
|
ASSERT_EQ(expected_versions.size(), versions.size());
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
for (auto &key_version : expected_versions) {
|
for (auto &key_version : expected_versions) {
|
||||||
|
@ -1232,7 +1233,8 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
|
||||||
blob_db_->ReleaseSnapshot(snapshot);
|
blob_db_->ReleaseSnapshot(snapshot);
|
||||||
// Verify expired blob index are filtered.
|
// Verify expired blob index are filtered.
|
||||||
std::vector<KeyVersion> versions;
|
std::vector<KeyVersion> versions;
|
||||||
GetAllKeyVersions(blob_db_, "", "", &versions);
|
const size_t kMaxKeys = 10000;
|
||||||
|
GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
|
||||||
ASSERT_EQ(data_after_compact.size(), versions.size());
|
ASSERT_EQ(data_after_compact.size(), versions.size());
|
||||||
for (auto &version : versions) {
|
for (auto &version : versions) {
|
||||||
ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
|
ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
|
||||||
|
@ -1262,9 +1264,11 @@ TEST_F(BlobDBTest, FilterFileNotAvailable) {
|
||||||
ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
|
ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
|
||||||
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
|
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
|
||||||
|
|
||||||
|
const size_t kMaxKeys = 10000;
|
||||||
|
|
||||||
DB *base_db = blob_db_->GetRootDB();
|
DB *base_db = blob_db_->GetRootDB();
|
||||||
std::vector<KeyVersion> versions;
|
std::vector<KeyVersion> versions;
|
||||||
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
|
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
|
||||||
ASSERT_EQ(2, versions.size());
|
ASSERT_EQ(2, versions.size());
|
||||||
ASSERT_EQ("bar", versions[0].user_key);
|
ASSERT_EQ("bar", versions[0].user_key);
|
||||||
ASSERT_EQ("foo", versions[1].user_key);
|
ASSERT_EQ("foo", versions[1].user_key);
|
||||||
|
@ -1272,7 +1276,7 @@ TEST_F(BlobDBTest, FilterFileNotAvailable) {
|
||||||
|
|
||||||
ASSERT_OK(blob_db_->Flush(FlushOptions()));
|
ASSERT_OK(blob_db_->Flush(FlushOptions()));
|
||||||
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
|
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
|
||||||
ASSERT_EQ(2, versions.size());
|
ASSERT_EQ(2, versions.size());
|
||||||
ASSERT_EQ("bar", versions[0].user_key);
|
ASSERT_EQ("bar", versions[0].user_key);
|
||||||
ASSERT_EQ("foo", versions[1].user_key);
|
ASSERT_EQ("foo", versions[1].user_key);
|
||||||
|
@ -1282,7 +1286,7 @@ TEST_F(BlobDBTest, FilterFileNotAvailable) {
|
||||||
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
|
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
|
||||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||||
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
|
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
|
||||||
ASSERT_EQ(1, versions.size());
|
ASSERT_EQ(1, versions.size());
|
||||||
ASSERT_EQ("bar", versions[0].user_key);
|
ASSERT_EQ("bar", versions[0].user_key);
|
||||||
VerifyDB({{"bar", "v2"}});
|
VerifyDB({{"bar", "v2"}});
|
||||||
|
@ -1291,7 +1295,7 @@ TEST_F(BlobDBTest, FilterFileNotAvailable) {
|
||||||
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
|
blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
|
||||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||||
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions));
|
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
|
||||||
ASSERT_EQ(0, versions.size());
|
ASSERT_EQ(0, versions.size());
|
||||||
VerifyDB({});
|
VerifyDB({});
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
||||||
|
size_t max_num_ikeys,
|
||||||
std::vector<KeyVersion>* key_versions) {
|
std::vector<KeyVersion>* key_versions) {
|
||||||
assert(key_versions != nullptr);
|
assert(key_versions != nullptr);
|
||||||
key_versions->clear();
|
key_versions->clear();
|
||||||
|
@ -30,6 +31,7 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
||||||
iter->SeekToFirst();
|
iter->SeekToFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t num_keys = 0;
|
||||||
for (; iter->Valid(); iter->Next()) {
|
for (; iter->Valid(); iter->Next()) {
|
||||||
ParsedInternalKey ikey;
|
ParsedInternalKey ikey;
|
||||||
if (!ParseInternalKey(iter->key(), &ikey)) {
|
if (!ParseInternalKey(iter->key(), &ikey)) {
|
||||||
|
@ -46,6 +48,9 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
|
||||||
iter->value().ToString() /* _value */,
|
iter->value().ToString() /* _value */,
|
||||||
ikey.sequence /* _sequence */,
|
ikey.sequence /* _sequence */,
|
||||||
static_cast<int>(ikey.type) /* _type */);
|
static_cast<int>(ikey.type) /* _type */);
|
||||||
|
if (++num_keys >= max_num_ikeys) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
|
@ -493,8 +493,10 @@ class WritePreparedTransactionTestBase : public TransactionTestBase {
|
||||||
// Verify all versions of keys.
|
// Verify all versions of keys.
|
||||||
void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
|
void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
|
||||||
std::vector<KeyVersion> versions;
|
std::vector<KeyVersion> versions;
|
||||||
|
const size_t kMaxKeys = 100000;
|
||||||
ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
|
ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
|
||||||
expected_versions.back().user_key, &versions));
|
expected_versions.back().user_key, kMaxKeys,
|
||||||
|
&versions));
|
||||||
ASSERT_EQ(expected_versions.size(), versions.size());
|
ASSERT_EQ(expected_versions.size(), versions.size());
|
||||||
for (size_t i = 0; i < versions.size(); i++) {
|
for (size_t i = 0; i < versions.size(); i++) {
|
||||||
ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
|
ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
|
||||||
|
|
Loading…
Reference in New Issue