VersionSet: GetOverlappingInputs() fix overflow and optimize. (#4385)

Summary:
This fix is for `level == 0` in `GetOverlappingInputs()`:
- In `GetOverlappingInputs()`, if `level == 0`, it has potential
risk of overflow if `i == 0`.
- Optmize process when `expand = true`, the expected complexity
can be reduced to O(n).

Signed-off-by: JiYou <jiyou09@gmail.com>
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4385

Differential Revision: D10181001

Pulled By: riversand963

fbshipit-source-id: 46eef8a1d1605c9329c164e6471cd5c5b6de16b5
This commit is contained in:
JiYou 2018-10-03 18:37:38 -07:00 committed by Facebook Github Bot
parent 1cf5deb8fd
commit a1f6142f38
1 changed files with 51 additions and 31 deletions

View File

@ -21,6 +21,7 @@
#include <string>
#include <unordered_map>
#include <vector>
#include <list>
#include "db/compaction.h"
#include "db/internal_stats.h"
#include "db/log_reader.h"
@ -2034,13 +2035,6 @@ void VersionStorageInfo::GetOverlappingInputs(
}
inputs->clear();
Slice user_begin, user_end;
if (begin != nullptr) {
user_begin = begin->user_key();
}
if (end != nullptr) {
user_end = end->user_key();
}
if (file_index) {
*file_index = -1;
}
@ -2051,33 +2045,59 @@ void VersionStorageInfo::GetOverlappingInputs(
return;
}
for (size_t i = 0; i < level_files_brief_[level].num_files; ) {
FdWithKeyRange* f = &(level_files_brief_[level].files[i++]);
const Slice file_start = ExtractUserKey(f->smallest_key);
const Slice file_limit = ExtractUserKey(f->largest_key);
if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
} else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
} else {
inputs->push_back(files_[level][i-1]);
if (level == 0 && expand_range) {
// Level-0 files may overlap each other. So check if the newly
// added file has expanded the range. If so, restart search.
if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
} else if (end != nullptr
&& user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
Slice user_begin, user_end;
if (begin != nullptr) {
user_begin = begin->user_key();
}
if (end != nullptr) {
user_end = end->user_key();
}
// index stores the file index need to check.
std::list<size_t> index;
for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
index.emplace_back(i);
}
while (!index.empty()) {
bool found_overlapping_file = false;
auto iter = index.begin();
while (iter != index.end()) {
FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
const Slice file_start = ExtractUserKey(f->smallest_key);
const Slice file_limit = ExtractUserKey(f->largest_key);
if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
iter++;
} else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
iter++;
} else {
// if overlap
inputs->emplace_back(files_[level][*iter]);
found_overlapping_file = true;
// record the first file index.
if (file_index && *file_index == -1) {
*file_index = static_cast<int>(*iter);
}
// the related file is overlap, erase to avoid checking again.
iter = index.erase(iter);
if (expand_range) {
if (begin != nullptr &&
user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
}
if (end != nullptr &&
user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
}
}
} else if (file_index) {
*file_index = static_cast<int>(i) - 1;
}
}
// if all the files left are not overlap, break
if (!found_overlapping_file) {
break;
}
}
}